You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/12 01:08:50 UTC
[3/8] git commit: DRILL-679: Support create table as query (CTAS)
(contd.).
DRILL-679: Support create table as query (CTAS) (contd.).
Continuation to e19606593f3173d8f82ca3074186e9ca7a960ce2.
Refactoring and align the writer interfaces similar to reader interfaces at the storage and file format level.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5b7f351e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5b7f351e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5b7f351e
Branch: refs/heads/master
Commit: 5b7f351e3bbc26d5e9a686cd5b0c636bb0b94b2e
Parents: 6dad590
Author: vkorukanti <ve...@gmail.com>
Authored: Fri May 9 23:17:00 2014 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Sat May 10 00:25:07 2014 -0700
----------------------------------------------------------------------
.../drill/common/logical/WorkspaceConfig.java | 72 --------
exec/java-exec/src/main/codegen/data/Parser.tdd | 3 +-
.../templates/StringOutputRecordWriter.java | 5 +-
.../exec/physical/base/AbstractWriter.java | 30 ++++
.../apache/drill/exec/physical/base/Writer.java | 24 +++
.../drill/exec/physical/config/Writer.java | 69 -------
.../exec/physical/impl/WriterRecordBatch.java | 174 ++++++++++++++++++
.../impl/writer/WriterBatchCreator.java | 36 ----
.../physical/impl/writer/WriterRecordBatch.java | 179 -------------------
.../exec/planner/logical/CreateTableEntry.java | 5 +-
.../logical/FileSystemCreateTableEntry.java | 73 ++++----
.../drill/exec/planner/physical/WriterPrel.java | 3 +-
.../drill/exec/planner/sql/DrillSqlWorker.java | 4 +-
.../sql/handlers/AbstractSqlHandler.java | 64 +++++++
.../planner/sql/handlers/DefaultSqlHandler.java | 2 +-
.../planner/sql/handlers/SetOptionHandler.java | 2 +-
.../exec/planner/sql/handlers/SqlHandler.java | 64 -------
.../planner/sql/handlers/UseSchemaHandler.java | 2 +-
.../exec/planner/sql/handlers/ViewHandler.java | 2 +-
.../exec/planner/sql/parser/DrillSqlCall.java | 4 +-
.../exec/planner/sql/parser/SqlCreateTable.java | 4 +-
.../exec/planner/sql/parser/SqlCreateView.java | 4 +-
.../planner/sql/parser/SqlDescribeTable.java | 4 +-
.../exec/planner/sql/parser/SqlDropView.java | 4 +-
.../exec/planner/sql/parser/SqlShowFiles.java | 4 +-
.../exec/planner/sql/parser/SqlShowSchemas.java | 4 +-
.../exec/planner/sql/parser/SqlShowTables.java | 4 +-
.../exec/planner/sql/parser/SqlUseSchema.java | 4 +-
.../drill/exec/store/RecordWriterRegistry.java | 91 ----------
.../drill/exec/store/dfs/FileSystemConfig.java | 1 -
.../drill/exec/store/dfs/FileSystemPlugin.java | 4 -
.../exec/store/dfs/FileSystemSchemaFactory.java | 6 +
.../drill/exec/store/dfs/FormatPlugin.java | 4 +
.../drill/exec/store/dfs/WorkspaceConfig.java | 72 ++++++++
.../exec/store/dfs/WorkspaceSchemaFactory.java | 23 ++-
.../exec/store/dfs/easy/EasyBatchCreator.java | 38 ----
.../exec/store/dfs/easy/EasyFormatPlugin.java | 25 ++-
.../store/dfs/easy/EasyReaderBatchCreator.java | 36 ++++
.../drill/exec/store/dfs/easy/EasyWriter.java | 97 ++++++++++
.../store/dfs/easy/EasyWriterBatchCreator.java | 36 ++++
.../exec/store/easy/json/JSONFormatPlugin.java | 8 +
.../exec/store/easy/text/TextFormatPlugin.java | 34 +++-
.../exec/store/parquet/ParquetFormatPlugin.java | 7 +
.../exec/store/text/DrillTextRecordWriter.java | 133 ++++++++++++++
.../exec/store/writer/RecordWriterTemplate.java | 35 ----
.../exec/store/writer/csv/CSVRecordWriter.java | 131 --------------
.../exec/physical/impl/writer/TestWriter.java | 54 +++++-
.../src/test/resources/storage-plugins.json | 15 +-
.../resources/writer/simple_csv_writer.json | 87 +++++----
49 files changed, 942 insertions(+), 844 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/common/src/main/java/org/apache/drill/common/logical/WorkspaceConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/logical/WorkspaceConfig.java b/common/src/main/java/org/apache/drill/common/logical/WorkspaceConfig.java
deleted file mode 100644
index cd1b1c4..0000000
--- a/common/src/main/java/org/apache/drill/common/logical/WorkspaceConfig.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.common.logical;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * Stores the workspace related config. A workspace has:
- * - location which is a path.
- * - writable flag to indicate whether the location supports creating new tables.
- * - default storage format for new tables created in this workspace.
- */
-public class WorkspaceConfig {
-
- /** Default workspace is a root directory which supports read, but not write. */
- public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null);
-
- private final String location;
- private final boolean writable;
- private final String storageformat;
-
- public WorkspaceConfig(@JsonProperty("location") String location,
- @JsonProperty("writable") boolean writable,
- @JsonProperty("storageformat") String storageformat) {
- this.location = location;
- this.writable = writable;
- this.storageformat = storageformat;
- }
-
- public String getLocation() {
- return location;
- }
-
- public boolean isWritable() {
- return writable;
- }
-
- @JsonProperty("storageformat")
- public String getStorageFormat() {
- return storageformat;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this)
- return true;
-
- if (obj == null || !(obj instanceof WorkspaceConfig)) {
- return false;
- }
-
- WorkspaceConfig that = (WorkspaceConfig) obj;
- return ((this.location == null && that.location == null) || this.location.equals(that.location)) &&
- this.writable == that.writable &&
- ((this.storageformat == null && that.storageformat == null) || this.storageformat.equals(that.storageformat));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd
index 581d645..d781f32 100644
--- a/exec/java-exec/src/main/codegen/data/Parser.tdd
+++ b/exec/java-exec/src/main/codegen/data/Parser.tdd
@@ -45,8 +45,7 @@
"SqlUseSchema()",
"SqlCreateOrReplaceView()",
"SqlDropView()",
- "SqlShowFiles()"
- "SqlDropView()",
+ "SqlShowFiles()",
"SqlCreateTable()"
]
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
index 629734f..506cace 100644
--- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
@@ -114,7 +114,10 @@ public abstract class StringOutputRecordWriter implements RecordWriter {
<#elseif minor.class == "VarChar" || minor.class == "Var16Char" || minor.class == "VarBinary">
addField(fieldId, valueHolder.toString());
- </#if>
+ <#else>
+ throw new UnsupportedOperationException(String.format("Unsupported field type: %s"),
+ valueHolder.getCanonicalClass());
+ </#if>
}
</#list>
</#list>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
new file mode 100644
index 0000000..af23d5f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+public abstract class AbstractWriter extends AbstractSingle implements Writer{
+
+ public AbstractWriter(PhysicalOperator child) {
+ super(child);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitWriter(this, value);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Writer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Writer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Writer.java
new file mode 100644
index 0000000..f33bf0a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Writer.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.base;
+
+/** Writer physical operator */
+public interface Writer extends PhysicalOperator{
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Writer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Writer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Writer.java
deleted file mode 100644
index 7140bcd..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Writer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.physical.config;
-
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.base.AbstractSingle;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.Size;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.exec.planner.logical.CreateTableEntry;
-
-@JsonTypeName("writer")
-public class Writer extends AbstractSingle {
-
- public final CreateTableEntry createTableEntry;
-
- public Writer(@JsonProperty("child") PhysicalOperator child,
- @JsonProperty("createTableEntry") CreateTableEntry createTableEntry) {
- super(child);
- this.createTableEntry = createTableEntry;
- }
-
- public CreateTableEntry getCreateTableEntry() {
- return createTableEntry;
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
- return physicalVisitor.visitWriter(this, value);
- }
-
- @Override
- public OperatorCost getCost() {
- /* Compute the total size (row count * row size) */
- Size size = child.getSize();
- long diskSize = size.getRecordCount() * size.getRecordSize();
-
- return new OperatorCost(0, diskSize, 0, child.getSize().getRecordCount());
- }
-
- @Override
- public Size getSize() {
- return child.getSize();
- }
-
- @Override
- protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new Writer(child, createTableEntry);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
new file mode 100644
index 0000000..81a4d58
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.Writer;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.RecordValueAccessor;
+import org.apache.drill.exec.store.EventBasedRecordWriter;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.VarCharVector;
+
+import java.io.IOException;
+
+/* Write the RecordBatch to the given RecordWriter. */
+public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriterRecordBatch.class);
+
+ private EventBasedRecordWriter eventBasedRecordWriter;
+ private RecordWriter recordWriter;
+ private int counter = 0;
+ private final RecordBatch incoming;
+ private boolean first = true;
+ private boolean processed = false;
+ private String fragmentUniqueId;
+
+ public WriterRecordBatch(Writer writer, RecordBatch incoming, FragmentContext context, RecordWriter recordWriter) throws OutOfMemoryException {
+ super(writer, context);
+ this.incoming = incoming;
+
+ FragmentHandle handle = context.getHandle();
+ fragmentUniqueId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
+ this.recordWriter = recordWriter;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return container.getRecordCount();
+ }
+
+ @Override
+ protected void killIncoming() {
+ incoming.kill();
+ }
+
+ @Override
+ public IterOutcome next() {
+ if(processed) {
+ // if the upstream record batch is already processed and next() is called by
+ // downstream then return NONE to indicate completion
+ return IterOutcome.NONE;
+ }
+
+ // process the complete upstream in one next() call
+ IterOutcome upstream;
+ do {
+ upstream = incoming.next();
+ if(first && upstream == IterOutcome.OK)
+ upstream = IterOutcome.OK_NEW_SCHEMA;
+ first = false;
+
+ switch(upstream) {
+ case NOT_YET:
+ case NONE:
+ case STOP:
+ cleanup();
+ if (upstream == IterOutcome.STOP)
+ return upstream;
+ break;
+
+ case OK_NEW_SCHEMA:
+ try{
+ setupNewSchema();
+ }catch(Exception ex){
+ kill();
+ logger.error("Failure during query", ex);
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ // fall through.
+ case OK:
+ try {
+ counter += eventBasedRecordWriter.write();
+ logger.debug("Total records written so far: {}", counter);
+ } catch(IOException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ for(VectorWrapper v : incoming)
+ v.getValueVector().clear();
+
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+ } while(upstream != IterOutcome.NONE);
+
+ // Create two vectors for:
+ // 1. Fragment unique id.
+ // 2. Summary: currently contains number of records written.
+ MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("Fragment"), Types.required(MinorType.VARCHAR));
+ MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT));
+
+ VarCharVector fragmentIdVector = (VarCharVector) TypeHelper.getNewVector(fragmentIdField, context.getAllocator());
+ AllocationHelper.allocate(fragmentIdVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
+ BigIntVector summaryVector = (BigIntVector) TypeHelper.getNewVector(summaryField, context.getAllocator());
+ AllocationHelper.allocate(summaryVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
+
+
+ container.add(fragmentIdVector);
+ container.add(summaryVector);
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+ fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes());
+ fragmentIdVector.getMutator().setValueCount(1);
+ summaryVector.getMutator().setSafe(0, counter);
+ summaryVector.getMutator().setValueCount(1);
+
+ container.setRecordCount(1);
+ processed = true;
+
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
+
+ protected void setupNewSchema() throws Exception {
+ try {
+ // update the schema in RecordWriter
+ recordWriter.updateSchema(incoming.getSchema());
+ } catch(IOException ex) {
+ throw new RuntimeException("Failed to update schema in RecordWriter", ex);
+ }
+
+ eventBasedRecordWriter = new EventBasedRecordWriter(incoming.getSchema(),
+ new RecordValueAccessor(incoming), recordWriter);
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ incoming.cleanup();
+ try {
+ if (recordWriter != null) {
+ recordWriter.cleanup();
+ }
+ } catch(IOException ex) {
+ throw new RuntimeException("Failed to close RecordWriter", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterBatchCreator.java
deleted file mode 100644
index 4d7baad..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterBatchCreator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.physical.impl.writer;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.Writer;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
-
-import java.util.List;
-
-public class WriterBatchCreator implements BatchCreator<Writer> {
-
- @Override
- public RecordBatch getBatch(FragmentContext context, Writer config, List<RecordBatch> children)
- throws ExecutionSetupException {
- return new WriterRecordBatch(config, children.iterator().next(), context);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterRecordBatch.java
deleted file mode 100644
index 81b2f5c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterRecordBatch.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.physical.impl.writer;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.Writer;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.RecordValueAccessor;
-import org.apache.drill.exec.store.EventBasedRecordWriter;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.VarCharVector;
-
-import java.io.IOException;
-
-/* Write the given RecordBatch to the given file in specified format. */
-public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriterRecordBatch.class);
-
- private EventBasedRecordWriter eventBasedRecordWriter;
- private RecordWriter recordWriter;
- private int counter = 0;
- private final RecordBatch incoming;
- private boolean first = true;
- private boolean processed = false;
- private String fragmentUniqueId;
-
- public WriterRecordBatch(Writer pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
- super(pop, context);
- this.incoming = incoming;
-
- FragmentHandle handle = context.getHandle();
- fragmentUniqueId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
- try {
- recordWriter = pop.getCreateTableEntry().getRecordWriter(fragmentUniqueId);
- } catch(IOException ex) {
- throw new RuntimeException("Failed to create RecordWriter", ex);
- }
- }
-
- @Override
- public int getRecordCount() {
- return container.getRecordCount();
- }
-
- @Override
- protected void killIncoming() {
- incoming.kill();
- }
-
- @Override
- public IterOutcome next() {
- if(processed) {
- // if the upstream record batch is already processed and next() called by
- // downstream then return NONE to indicate completion
- return IterOutcome.NONE;
- }
-
- // process the complete upstream in one next() call
- IterOutcome upstream;
- do {
- upstream = incoming.next();
- if(first && upstream == IterOutcome.OK)
- upstream = IterOutcome.OK_NEW_SCHEMA;
- first = false;
-
- switch(upstream) {
- case NOT_YET:
- case NONE:
- case STOP:
- cleanup();
- if (upstream == IterOutcome.STOP)
- return upstream;
- break;
-
- case OK_NEW_SCHEMA:
- try{
- setupNewSchema();
- }catch(Exception ex){
- kill();
- logger.error("Failure during query", ex);
- context.fail(ex);
- return IterOutcome.STOP;
- }
- // fall through.
- case OK:
- try {
- counter += eventBasedRecordWriter.write();
- logger.debug("Total records written so far: {}", counter);
- } catch(IOException ex) {
- throw new RuntimeException(ex);
- }
-
- for(VectorWrapper v : incoming)
- v.getValueVector().clear();
-
- break;
-
- default:
- throw new UnsupportedOperationException();
- }
- } while(upstream != IterOutcome.NONE);
-
- // Create two vectors for:
- // 1. Fragment unique id.
- // 2. Summary: currently contains number of records written.
- MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("fragment"), Types.required(MinorType.VARCHAR));
- MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT));
-
- VarCharVector fragmentIdVector = (VarCharVector) TypeHelper.getNewVector(fragmentIdField, context.getAllocator());
- AllocationHelper.allocate(fragmentIdVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
- BigIntVector summaryVector = (BigIntVector) TypeHelper.getNewVector(summaryField, context.getAllocator());
- AllocationHelper.allocate(summaryVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
-
-
- container.add(fragmentIdVector);
- container.add(summaryVector);
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-
- fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes());
- fragmentIdVector.getMutator().setValueCount(1);
- summaryVector.getMutator().setSafe(0, counter);
- summaryVector.getMutator().setValueCount(1);
-
- container.setRecordCount(1);
- processed = true;
-
- return IterOutcome.OK_NEW_SCHEMA;
- }
-
- protected void setupNewSchema() throws Exception {
- try {
- // update the schema in RecordWriter
- recordWriter.updateSchema(incoming.getSchema());
- } catch(IOException ex) {
- throw new RuntimeException("Failed to update schema in RecordWriter", ex);
- }
-
- eventBasedRecordWriter = new EventBasedRecordWriter(incoming.getSchema(),
- new RecordValueAccessor(incoming), recordWriter);
- }
-
- @Override
- public void cleanup() {
- super.cleanup();
- incoming.cleanup();
- try {
- if (recordWriter != null) {
- recordWriter.cleanup();
- }
- } catch(IOException ex) {
- throw new RuntimeException("Failed to close RecordWriter", ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
index 90e3a79..7f6acd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
@@ -21,7 +21,8 @@ package org.apache.drill.exec.planner.logical;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Writer;
import java.io.IOException;
@@ -35,5 +36,5 @@ import java.io.IOException;
})
public interface CreateTableEntry {
- RecordWriter getRecordWriter(String fragmentUniqueId) throws IOException;
+ Writer getWriter(PhysicalOperator child) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
index 78f2007..88d17a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
@@ -17,16 +17,19 @@
*/
package org.apache.drill.exec.planner.logical;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.ImmutableMap;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.RecordWriterRegistry;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Writer;
+import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
import java.io.IOException;
-import java.util.Map;
/**
* Implements <code>CreateTableEntry</code> interface to create new tables in FileSystem storage.
@@ -34,39 +37,41 @@ import java.util.Map;
@JsonTypeName("filesystem")
public class FileSystemCreateTableEntry implements CreateTableEntry {
- public FileSystemConfig config;
- public String format;
- public String location;
+ private FileSystemConfig storageConfig;
+ private FormatPlugin formatPlugin;
+ private String location;
- /**
- * Create an entry.
- * @param config Storage configuration.
- * @param format Output format such as "csv", "parquet" etc.
- * @param location Directory where the data files for the new table are created.
- */
- public FileSystemCreateTableEntry(@JsonProperty("config") FileSystemConfig config,
- @JsonProperty("format") String format,
- @JsonProperty("location") String location) {
- this.config = config;
- this.format = format;
+ @JsonCreator
+ public FileSystemCreateTableEntry(@JsonProperty("storageConfig") FileSystemConfig storageConfig,
+ @JsonProperty("formatConfig") FormatPluginConfig formatConfig,
+ @JsonProperty("location") String location,
+ @JacksonInject StoragePluginRegistry engineRegistry)
+ throws ExecutionSetupException {
+ this.storageConfig = storageConfig;
+ this.formatPlugin = engineRegistry.getFormatPlugin(storageConfig, formatConfig);
this.location = location;
}
- /**
- * Returns an implementation of the RecordWriter which is used to write data to new table.
- * @param prefix Fragment unique identifier to prefix to files created by RecordWriter.
- * This is needed to avoid fragments overwriting file in parallel overwriting others.
- * @return A RecordWriter object.
- * @throws IOException
- */
- @Override
- public RecordWriter getRecordWriter(String prefix) throws IOException {
- Map<String, String> options = ImmutableMap.of(
- "location", location,
- FileSystem.FS_DEFAULT_NAME_KEY, config.connection,
- "prefix", prefix
- );
+ public FileSystemCreateTableEntry(FileSystemConfig storageConfig,
+ FormatPlugin formatPlugin,
+ String location) {
+ this.storageConfig = storageConfig;
+ this.formatPlugin = formatPlugin;
+ this.location = location;
+ }
+
+ @JsonProperty("storageConfig")
+ public FileSystemConfig getStorageConfig() {
+ return storageConfig;
+ }
+
+ @JsonProperty("formatConfig")
+ public FormatPluginConfig getFormatConfig() {
+ return formatPlugin.getConfig();
+ }
- return RecordWriterRegistry.get(format, options);
+ @Override
+ public Writer getWriter(PhysicalOperator child) throws IOException {
+ return formatPlugin.getWriter(child, location);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
index a1f1dec..98a42de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.Writer;
import org.apache.drill.exec.planner.common.DrillWriterRelBase;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.eigenbase.rel.RelNode;
@@ -42,7 +41,7 @@ public class WriterPrel extends DrillWriterRelBase implements Prel {
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
Prel child = (Prel) this.getChild();
- return new Writer(child.getPhysicalOperator(creator), getCreateTableEntry());
+ return getCreateTableEntry().getWriter(child.getPhysicalOperator(creator));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 18b645b..bd57785 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -35,10 +35,10 @@ import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.planner.logical.DrillRuleSets;
import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.ExplainHandler;
import org.apache.drill.exec.planner.sql.handlers.SetOptionHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
import org.apache.drill.exec.planner.sql.handlers.UseSchemaHandler;
import org.apache.drill.exec.planner.sql.parser.SqlDescribeTable;
@@ -109,7 +109,7 @@ public class DrillSqlWorker {
public PhysicalPlan getPlan(String sql) throws SqlParseException, ValidationException, RelConversionException, IOException{
SqlNode sqlNode = planner.parse(sql);
- SqlHandler handler;
+ AbstractSqlHandler handler;
// TODO: make this use path scanning or something similar.
switch(sqlNode.getKind()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java
new file mode 100644
index 0000000..492c60a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import java.io.IOException;
+
+import com.google.common.base.Joiner;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.RelConversionException;
+import net.hydromatic.optiq.tools.ValidationException;
+
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.eigenbase.sql.SqlNode;
+
+public abstract class AbstractSqlHandler {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSqlHandler.class);
+
+ public abstract PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException;
+
+ public static <T> T unwrap(Object o, Class<T> clazz) throws RelConversionException{
+ if(clazz.isAssignableFrom(o.getClass())){
+ return (T) o;
+ }else{
+ throw new RelConversionException(String.format("Failure trying to treat %s as type %s.", o.getClass().getSimpleName(), clazz.getSimpleName()));
+ }
+ }
+
+ /**
+ * From a given SchemaPlus return a mutable Drill schema object AbstractSchema if exists.
+ * Otherwise throw errors.
+ */
+ public static AbstractSchema getMutableDrillSchema(SchemaPlus schemaPlus) throws Exception{
+ AbstractSchema drillSchema;
+ try {
+ drillSchema = schemaPlus.unwrap(AbstractSchema.class);
+ drillSchema = drillSchema.getDefaultSchema();
+ } catch(ClassCastException e) {
+ throw new Exception("Current schema is not a Drill schema. " +
+ "Can't create new relations (tables or views) in non-Drill schemas.", e);
+ }
+
+ if (!drillSchema.isMutable())
+ throw new Exception(String.format("Current schema '%s' is not a mutable schema. " +
+ "Can't create new relations.", Joiner.on(".").join(drillSchema.getSchemaPath())));
+
+ return drillSchema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 84517ad..d107c29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -51,7 +51,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class DefaultSqlHandler extends SqlHandler{
+public class DefaultSqlHandler extends AbstractSqlHandler{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultSqlHandler.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
index 5033e50..eafc8d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
@@ -29,7 +29,7 @@ import org.eigenbase.sql.SqlLiteral;
import org.eigenbase.sql.SqlNode;
import org.eigenbase.sql.SqlSetOption;
-public class SetOptionHandler extends SqlHandler{
+public class SetOptionHandler extends AbstractSqlHandler{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
QueryContext context;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandler.java
deleted file mode 100644
index 1b23c1e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandler.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.sql.handlers;
-
-import java.io.IOException;
-
-import com.google.common.base.Joiner;
-import net.hydromatic.optiq.SchemaPlus;
-import net.hydromatic.optiq.tools.RelConversionException;
-import net.hydromatic.optiq.tools.ValidationException;
-
-import org.apache.drill.exec.physical.PhysicalPlan;
-import org.apache.drill.exec.store.AbstractSchema;
-import org.eigenbase.sql.SqlNode;
-
-public abstract class SqlHandler {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SqlHandler.class);
-
- public abstract PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException;
-
- public static <T> T unwrap(Object o, Class<T> clazz) throws RelConversionException{
- if(clazz.isAssignableFrom(o.getClass())){
- return (T) o;
- }else{
- throw new RelConversionException(String.format("Failure trying to treat %s as type %s.", o.getClass().getSimpleName(), clazz.getSimpleName()));
- }
- }
-
- /**
- * From a given SchemaPlus return a mutable Drill schema object AbstractSchema if exists.
- * Otherwise throw errors.
- */
- public static AbstractSchema getMutableDrillSchema(SchemaPlus schemaPlus) throws Exception{
- AbstractSchema drillSchema;
- try {
- drillSchema = schemaPlus.unwrap(AbstractSchema.class);
- drillSchema = drillSchema.getDefaultSchema();
- } catch(ClassCastException e) {
- throw new Exception("Current schema is not a Drill schema. " +
- "Can't create new relations (tables or views) in non-Drill schemas.", e);
- }
-
- if (!drillSchema.isMutable())
- throw new Exception(String.format("Current schema '%s' is not a mutable schema. " +
- "Can't create new relations.", Joiner.on(".").join(drillSchema.getSchemaPath())));
-
- return drillSchema;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
index 5b12259..9b23e23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.parser.SqlUseSchema;
import org.eigenbase.sql.SqlNode;
-public class UseSchemaHandler extends SqlHandler{
+public class UseSchemaHandler extends AbstractSqlHandler{
QueryContext context;
public UseSchemaHandler(QueryContext context) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
index 5c221e7..babe015 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
@@ -38,7 +38,7 @@ import org.eigenbase.rel.RelNode;
import org.eigenbase.reltype.RelDataType;
import org.eigenbase.sql.SqlNode;
-public abstract class ViewHandler extends SqlHandler{
+public abstract class ViewHandler extends AbstractSqlHandler{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ViewHandler.class);
protected Planner planner;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
index 32f57b4..5fa592a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.planner.sql.parser;
import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -33,7 +33,7 @@ public abstract class DrillSqlCall extends SqlCall {
super(pos);
}
- public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+ public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
return new DefaultSqlHandler(planner, context);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
index ed394d7..c29296b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
@@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableList;
import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.CreateTableHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.eigenbase.sql.*;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -72,7 +72,7 @@ public class SqlCreateTable extends DrillSqlCall {
}
@Override
- public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+ public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
return new CreateTableHandler(planner, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
index b124e1d..68176ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.ImmutableList;
import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.ViewHandler;
import org.eigenbase.sql.*;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -79,7 +79,7 @@ public class SqlCreateView extends DrillSqlCall {
}
@Override
- public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+ public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
return new ViewHandler.CreateView(planner, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
index 0a666de..30c7e43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.DescribeTableHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.eigenbase.sql.*;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -71,7 +71,7 @@ public class SqlDescribeTable extends DrillSqlCall {
}
@Override
- public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+ public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
return new DescribeTableHandler(planner, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
index 8ba7edd..7afe428 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.planner.sql.parser;
import com.google.common.collect.ImmutableList;
import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.ViewHandler.DropView;
import org.eigenbase.sql.*;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -55,7 +55,7 @@ public class SqlDropView extends DrillSqlCall {
}
@Override
- public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+ public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
return new DropView(context);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
index b7fba06..f882ba9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
@@ -20,9 +20,9 @@ package org.apache.drill.exec.planner.sql.parser;
import com.google.common.collect.Lists;
import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.ShowFileHandler;
import org.apache.drill.exec.planner.sql.handlers.ShowTablesHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
import org.eigenbase.sql.*;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -64,7 +64,7 @@ public class SqlShowFiles extends DrillSqlCall {
}
@Override
- public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+ public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
return new ShowFileHandler(planner, context);
}
public SqlIdentifier getDb() { return db; }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
index aa212a7..34695d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.ShowSchemasHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.eigenbase.sql.*;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -70,7 +70,7 @@ public class SqlShowSchemas extends DrillSqlCall {
}
@Override
- public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+ public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
return new ShowSchemasHandler(planner, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
index e3be378..26d4fa2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.ShowTablesHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.eigenbase.sql.*;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -74,7 +74,7 @@ public class SqlShowTables extends DrillSqlCall {
}
@Override
- public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+ public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
return new ShowTablesHandler(planner, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
index 92d3aff..42a3914 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.planner.sql.parser;
import com.google.common.collect.ImmutableList;
import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.UseSchemaHandler;
import org.eigenbase.sql.*;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -59,7 +59,7 @@ public class SqlUseSchema extends DrillSqlCall {
}
@Override
- public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+ public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
return new UseSchemaHandler(context);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java
deleted file mode 100644
index 8c79fed..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-import com.google.common.collect.Maps;
-import org.apache.drill.common.util.PathScanner;
-import org.apache.drill.exec.store.writer.RecordWriterTemplate;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-public class RecordWriterRegistry {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordWriterRegistry.class);
-
- // Contains mapping of "format" to RecordWriter implementation class.
- private static Map<String, Class<? extends RecordWriter>> formatRegistry;
-
- // Contains mapping of RecordWriter class to standard constructor that is used to instantiate the RecordWriter object.
- private static Map<Class<? extends RecordWriter>, Constructor<? extends RecordWriter>> constructorMap;
-
- static {
- formatRegistry = Maps.newHashMap();
- constructorMap = Maps.newHashMap();
-
- Class<?>[] rwc = PathScanner.scanForImplementationsArr(RecordWriter.class, null);
-
- for(Class<?> clazz : rwc) {
- RecordWriterTemplate template = clazz.getAnnotation(RecordWriterTemplate.class);
- if(template == null){
- logger.warn("{} doesn't have {} annotation. Skipping.", clazz.getCanonicalName(), RecordWriterTemplate.class);
- continue;
- }
-
- if (template.format() == null || template.format().isEmpty()) {
- logger.warn("{} annotation doesn't have valid format field. Skipping.", RecordWriterTemplate.class);
- continue;
- }
-
- // Find the standard empty parameter constructor and store it in map.
- Constructor<?> validConstructor = null;
- for(Constructor<?> c : clazz.getConstructors()) {
- if (c.getParameterTypes().length == 0) {
- validConstructor = c;
- break;
- }
- }
-
- if (validConstructor != null) {
- formatRegistry.put(template.format(), (Class<? extends RecordWriter>)clazz);
- constructorMap.put((Class<? extends RecordWriter>)clazz, (Constructor<? extends RecordWriter>)validConstructor);
- } else {
- logger.info("Skipping RecordWriter class '{}' since it doesn't implement a constructor [{}()]",
- clazz.getCanonicalName(), clazz.getName());
- }
- }
- }
-
- public static RecordWriter get(String format, Map<String, String> options) throws IOException {
-
- if (formatRegistry.containsKey(format)) {
- try {
- RecordWriter writer = constructorMap.get(formatRegistry.get(format)).newInstance();
- writer.init(options);
- return writer;
- } catch(Exception e) {
- logger.debug("Failed to create RecordWriter. Received format: {}, options: {}", format, options, e);
- throw new IOException(
- String.format("Failed to create RecordWriter for format '%s' with options '%s'", format, options), e);
- }
- }
-
- logger.error("Unknown format '{}' received", format);
- throw new IOException(String.format("Unknown format '%s' received", format));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 6254dfb..2d1a9ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.WorkspaceConfig;
@JsonTypeName("file")
public class FileSystemConfig implements StoragePluginConfig{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 6ddc84a..39d71e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -22,7 +22,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import net.hydromatic.optiq.Schema;
import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.JSONOptions;
@@ -30,10 +29,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.logical.WorkspaceConfig;
-import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.DrillUser;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 0314dc9..28496eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -28,6 +28,7 @@ import net.hydromatic.optiq.Schema;
import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.Table;
+import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.rpc.user.DrillUser;
import org.apache.drill.exec.rpc.user.UserSession;
@@ -124,6 +125,11 @@ public class FileSystemSchemaFactory implements SchemaFactory{
}
@Override
+ public CreateTableEntry createNewTable(String tableName) {
+ return defaultSchema.createNewTable(tableName);
+ }
+
+ @Override
public AbstractSchema getDefaultSchema() {
return defaultSchema;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 788e7ac..20ea0a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -25,6 +25,8 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
@@ -40,6 +42,8 @@ public interface FormatPlugin {
public FormatMatcher getMatcher();
+ public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException;
+
public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException;
public Set<StoragePluginOptimizerRule> getOptimizerRules();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
new file mode 100644
index 0000000..4e7fb8f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Stores the workspace related config. A workspace has:
+ * - location which is a path.
+ * - writable flag to indicate whether the location supports creating new tables.
+ * - default storage format for new tables created in this workspace.
+ */
+public class WorkspaceConfig {
+
+ /** Default workspace is a root directory which supports read, but not write. */
+ public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null);
+
+ private final String location;
+ private final boolean writable;
+ private final String storageformat;
+
+ public WorkspaceConfig(@JsonProperty("location") String location,
+ @JsonProperty("writable") boolean writable,
+ @JsonProperty("storageformat") String storageformat) {
+ this.location = location;
+ this.writable = writable;
+ this.storageformat = storageformat;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public boolean isWritable() {
+ return writable;
+ }
+
+ @JsonProperty("storageformat")
+ public String getStorageFormat() {
+ return storageformat;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+
+ if (obj == null || !(obj instanceof WorkspaceConfig)) {
+ return false;
+ }
+
+ WorkspaceConfig that = (WorkspaceConfig) obj;
+ return ((this.location == null && that.location == null) || this.location.equals(that.location)) &&
+ this.writable == that.writable &&
+ ((this.storageformat == null && that.storageformat == null) || this.storageformat.equals(that.storageformat));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index d904297..df73ea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -24,10 +24,8 @@ import java.util.Set;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.Table;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.WorkspaceConfig;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
@@ -36,7 +34,6 @@ import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-
import org.apache.hadoop.fs.Path;
public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
@@ -114,7 +111,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
public class WorkspaceSchema extends AbstractSchema implements HasFileSystemSchema {
private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<String, DrillTable>(WorkspaceSchemaFactory.this);
- private final UserSession session;
+ private UserSession session;
public WorkspaceSchema(List<String> parentSchemaPath, String name, UserSession session) {
super(parentSchemaPath, name);
@@ -140,14 +137,22 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
}
@Override
- public CreateTableEntry createNewTable(String tableName) {
- return new FileSystemCreateTableEntry((FileSystemConfig)plugin.getConfig(), config.getStorageFormat(),
- config.getLocation() + Path.SEPARATOR + tableName);
+ public DrillFileSystem getFS() {
+ return fs;
}
@Override
- public DrillFileSystem getFS() {
- return fs;
+ public CreateTableEntry createNewTable(String tableName) {
+ FormatPlugin formatPlugin = plugin.getFormatPlugin(config.getStorageFormat());
+ if (formatPlugin == null)
+ throw new UnsupportedOperationException(
+ String.format("Unsupported format '%s' in workspace '%s'", config.getStorageFormat(),
+ Joiner.on(".").join(getSchemaPath())));
+
+ return new FileSystemCreateTableEntry(
+ (FileSystemConfig) plugin.getConfig(),
+ plugin.getFormatPlugin(config.getStorageFormat()),
+ config.getLocation() + Path.SEPARATOR + tableName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java
deleted file mode 100644
index d79e542..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.easy;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
-
-public class EasyBatchCreator implements BatchCreator<EasySubScan>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyBatchCreator.class);
-
- @Override
- public RecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
- throws ExecutionSetupException {
- assert children == null || children.isEmpty();
- return config.getFormatPlugin().getBatch(context, config);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 67502ef..2965e79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -31,10 +31,14 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.WriterRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -108,8 +112,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
public abstract RecordReader getRecordReader(FragmentContext context, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException;
-
- RecordBatch getBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+ RecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
List<SchemaPath> columns = scan.getColumns();
List<RecordReader> readers = Lists.newArrayList();
@@ -155,7 +158,23 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
return new ScanBatch(scan, context, readers.iterator(), partitionColumns, selectedPartitionColumns);
}
-
+
+ public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;
+
+ public RecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, EasyWriter writer)
+ throws ExecutionSetupException {
+ try {
+ return new WriterRecordBatch(writer, incoming, context, getRecordWriter(context, writer));
+ } catch(IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create the WriterRecordBatch. %s", e.getMessage()), e);
+ }
+ }
+
+ @Override
+ public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException {
+ return new EasyWriter(child, location, this);
+ }
+
@Override
public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException {
return new EasyGroupScan(selection, this, null, selection.selectionRoot);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
new file mode 100644
index 0000000..ac0d2e7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+public class EasyReaderBatchCreator implements BatchCreator<EasySubScan>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyReaderBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ assert children == null || children.isEmpty();
+ return config.getFormatPlugin().getReaderBatch(context, config);
+ }
+}