You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/12 01:08:50 UTC

[3/8] git commit: DRILL-679: Support create table as query (CTAS) (contd.).

DRILL-679: Support create table as query (CTAS) (contd.).

Continuation to e19606593f3173d8f82ca3074186e9ca7a960ce2.
Refactoring and align the writer interfaces similar to reader interfaces at the storage and file format level.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5b7f351e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5b7f351e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5b7f351e

Branch: refs/heads/master
Commit: 5b7f351e3bbc26d5e9a686cd5b0c636bb0b94b2e
Parents: 6dad590
Author: vkorukanti <ve...@gmail.com>
Authored: Fri May 9 23:17:00 2014 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Sat May 10 00:25:07 2014 -0700

----------------------------------------------------------------------
 .../drill/common/logical/WorkspaceConfig.java   |  72 --------
 exec/java-exec/src/main/codegen/data/Parser.tdd |   3 +-
 .../templates/StringOutputRecordWriter.java     |   5 +-
 .../exec/physical/base/AbstractWriter.java      |  30 ++++
 .../apache/drill/exec/physical/base/Writer.java |  24 +++
 .../drill/exec/physical/config/Writer.java      |  69 -------
 .../exec/physical/impl/WriterRecordBatch.java   | 174 ++++++++++++++++++
 .../impl/writer/WriterBatchCreator.java         |  36 ----
 .../physical/impl/writer/WriterRecordBatch.java | 179 -------------------
 .../exec/planner/logical/CreateTableEntry.java  |   5 +-
 .../logical/FileSystemCreateTableEntry.java     |  73 ++++----
 .../drill/exec/planner/physical/WriterPrel.java |   3 +-
 .../drill/exec/planner/sql/DrillSqlWorker.java  |   4 +-
 .../sql/handlers/AbstractSqlHandler.java        |  64 +++++++
 .../planner/sql/handlers/DefaultSqlHandler.java |   2 +-
 .../planner/sql/handlers/SetOptionHandler.java  |   2 +-
 .../exec/planner/sql/handlers/SqlHandler.java   |  64 -------
 .../planner/sql/handlers/UseSchemaHandler.java  |   2 +-
 .../exec/planner/sql/handlers/ViewHandler.java  |   2 +-
 .../exec/planner/sql/parser/DrillSqlCall.java   |   4 +-
 .../exec/planner/sql/parser/SqlCreateTable.java |   4 +-
 .../exec/planner/sql/parser/SqlCreateView.java  |   4 +-
 .../planner/sql/parser/SqlDescribeTable.java    |   4 +-
 .../exec/planner/sql/parser/SqlDropView.java    |   4 +-
 .../exec/planner/sql/parser/SqlShowFiles.java   |   4 +-
 .../exec/planner/sql/parser/SqlShowSchemas.java |   4 +-
 .../exec/planner/sql/parser/SqlShowTables.java  |   4 +-
 .../exec/planner/sql/parser/SqlUseSchema.java   |   4 +-
 .../drill/exec/store/RecordWriterRegistry.java  |  91 ----------
 .../drill/exec/store/dfs/FileSystemConfig.java  |   1 -
 .../drill/exec/store/dfs/FileSystemPlugin.java  |   4 -
 .../exec/store/dfs/FileSystemSchemaFactory.java |   6 +
 .../drill/exec/store/dfs/FormatPlugin.java      |   4 +
 .../drill/exec/store/dfs/WorkspaceConfig.java   |  72 ++++++++
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |  23 ++-
 .../exec/store/dfs/easy/EasyBatchCreator.java   |  38 ----
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |  25 ++-
 .../store/dfs/easy/EasyReaderBatchCreator.java  |  36 ++++
 .../drill/exec/store/dfs/easy/EasyWriter.java   |  97 ++++++++++
 .../store/dfs/easy/EasyWriterBatchCreator.java  |  36 ++++
 .../exec/store/easy/json/JSONFormatPlugin.java  |   8 +
 .../exec/store/easy/text/TextFormatPlugin.java  |  34 +++-
 .../exec/store/parquet/ParquetFormatPlugin.java |   7 +
 .../exec/store/text/DrillTextRecordWriter.java  | 133 ++++++++++++++
 .../exec/store/writer/RecordWriterTemplate.java |  35 ----
 .../exec/store/writer/csv/CSVRecordWriter.java  | 131 --------------
 .../exec/physical/impl/writer/TestWriter.java   |  54 +++++-
 .../src/test/resources/storage-plugins.json     |  15 +-
 .../resources/writer/simple_csv_writer.json     |  87 +++++----
 49 files changed, 942 insertions(+), 844 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/common/src/main/java/org/apache/drill/common/logical/WorkspaceConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/logical/WorkspaceConfig.java b/common/src/main/java/org/apache/drill/common/logical/WorkspaceConfig.java
deleted file mode 100644
index cd1b1c4..0000000
--- a/common/src/main/java/org/apache/drill/common/logical/WorkspaceConfig.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.common.logical;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * Stores the workspace related config. A workspace has:
- *  - location which is a path.
- *  - writable flag to indicate whether the location supports creating new tables.
- *  - default storage format for new tables created in this workspace.
- */
-public class WorkspaceConfig {
-
-  /** Default workspace is a root directory which supports read, but not write. */
-  public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null);
-
-  private final String location;
-  private final boolean writable;
-  private final String storageformat;
-
-  public WorkspaceConfig(@JsonProperty("location") String location,
-                         @JsonProperty("writable") boolean writable,
-                         @JsonProperty("storageformat") String storageformat) {
-    this.location = location;
-    this.writable = writable;
-    this.storageformat = storageformat;
-  }
-
-  public String getLocation() {
-    return location;
-  }
-
-  public boolean isWritable() {
-    return writable;
-  }
-
-  @JsonProperty("storageformat")
-  public String getStorageFormat() {
-    return storageformat;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == this)
-      return true;
-
-    if (obj == null || !(obj instanceof WorkspaceConfig)) {
-      return false;
-    }
-
-    WorkspaceConfig that = (WorkspaceConfig) obj;
-    return ((this.location == null && that.location == null) || this.location.equals(that.location)) &&
-        this.writable == that.writable &&
-        ((this.storageformat == null && that.storageformat == null) || this.storageformat.equals(that.storageformat));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd
index 581d645..d781f32 100644
--- a/exec/java-exec/src/main/codegen/data/Parser.tdd
+++ b/exec/java-exec/src/main/codegen/data/Parser.tdd
@@ -45,8 +45,7 @@
     "SqlUseSchema()",
     "SqlCreateOrReplaceView()",
     "SqlDropView()",
-    "SqlShowFiles()"
-    "SqlDropView()",
+    "SqlShowFiles()",
     "SqlCreateTable()"
   ]
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
index 629734f..506cace 100644
--- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
@@ -114,7 +114,10 @@ public abstract class StringOutputRecordWriter implements RecordWriter {
 
   <#elseif minor.class == "VarChar" || minor.class == "Var16Char" || minor.class == "VarBinary">
     addField(fieldId, valueHolder.toString());
-  </#if>
+  <#else>
+    throw new UnsupportedOperationException(String.format("Unsupported field type: %s"),
+      valueHolder.getCanonicalClass());
+   </#if>
   }
     </#list>
   </#list>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
new file mode 100644
index 0000000..af23d5f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+public abstract class AbstractWriter extends AbstractSingle implements Writer{
+
+  public AbstractWriter(PhysicalOperator child) {
+    super(child);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitWriter(this, value);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Writer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Writer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Writer.java
new file mode 100644
index 0000000..f33bf0a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Writer.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.base;
+
+/** Writer physical operator */
+public interface Writer extends PhysicalOperator{
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Writer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Writer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Writer.java
deleted file mode 100644
index 7140bcd..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Writer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.physical.config;
-
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.base.AbstractSingle;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.Size;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.exec.planner.logical.CreateTableEntry;
-
-@JsonTypeName("writer")
-public class Writer extends AbstractSingle {
-
-  public final CreateTableEntry createTableEntry;
-
-  public Writer(@JsonProperty("child") PhysicalOperator child,
-                @JsonProperty("createTableEntry") CreateTableEntry createTableEntry) {
-    super(child);
-    this.createTableEntry = createTableEntry;
-  }
-
-  public CreateTableEntry getCreateTableEntry() {
-    return createTableEntry;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
-    return physicalVisitor.visitWriter(this, value);
-  }
-
-  @Override
-  public OperatorCost getCost() {
-    /* Compute the total size (row count * row size) */
-    Size size     = child.getSize();
-    long diskSize = size.getRecordCount() * size.getRecordSize();
-
-    return new OperatorCost(0, diskSize, 0, child.getSize().getRecordCount());
-  }
-
-  @Override
-  public Size getSize() {
-    return child.getSize();
-  }
-
-  @Override
-  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new Writer(child, createTableEntry);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
new file mode 100644
index 0000000..81a4d58
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.Writer;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.RecordValueAccessor;
+import org.apache.drill.exec.store.EventBasedRecordWriter;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.VarCharVector;
+
+import java.io.IOException;
+
+/* Write the RecordBatch to the given RecordWriter. */
+public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriterRecordBatch.class);
+
+  private EventBasedRecordWriter eventBasedRecordWriter;
+  private RecordWriter recordWriter;
+  private int counter = 0;
+  private final RecordBatch incoming;
+  private boolean first = true;
+  private boolean processed = false;
+  private String fragmentUniqueId;
+
+  public WriterRecordBatch(Writer writer, RecordBatch incoming, FragmentContext context, RecordWriter recordWriter) throws OutOfMemoryException {
+    super(writer, context);
+    this.incoming = incoming;
+
+    FragmentHandle handle = context.getHandle();
+    fragmentUniqueId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
+    this.recordWriter = recordWriter;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return container.getRecordCount();
+  }
+
+  @Override
+  protected void killIncoming() {
+    incoming.kill();
+  }
+
+  @Override
+  public IterOutcome next() {
+    if(processed) {
+      // if the upstream record batch is already processed and next() is called by
+      // downstream then return NONE to indicate completion
+      return IterOutcome.NONE;
+    }
+
+    // process the complete upstream in one next() call
+    IterOutcome upstream;
+    do {
+      upstream = incoming.next();
+      if(first && upstream == IterOutcome.OK)
+        upstream = IterOutcome.OK_NEW_SCHEMA;
+      first = false;
+
+      switch(upstream) {
+        case NOT_YET:
+        case NONE:
+        case STOP:
+          cleanup();
+          if (upstream == IterOutcome.STOP)
+            return upstream;
+          break;
+
+        case OK_NEW_SCHEMA:
+          try{
+            setupNewSchema();
+          }catch(Exception ex){
+            kill();
+            logger.error("Failure during query", ex);
+            context.fail(ex);
+            return IterOutcome.STOP;
+          }
+          // fall through.
+        case OK:
+          try {
+            counter += eventBasedRecordWriter.write();
+            logger.debug("Total records written so far: {}", counter);
+          } catch(IOException ex) {
+            throw new RuntimeException(ex);
+          }
+
+          for(VectorWrapper v : incoming)
+            v.getValueVector().clear();
+
+          break;
+
+        default:
+          throw new UnsupportedOperationException();
+      }
+    } while(upstream != IterOutcome.NONE);
+
+    // Create two vectors for:
+    //   1. Fragment unique id.
+    //   2. Summary: currently contains number of records written.
+    MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("Fragment"), Types.required(MinorType.VARCHAR));
+    MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT));
+
+    VarCharVector fragmentIdVector = (VarCharVector) TypeHelper.getNewVector(fragmentIdField, context.getAllocator());
+    AllocationHelper.allocate(fragmentIdVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
+    BigIntVector summaryVector = (BigIntVector) TypeHelper.getNewVector(summaryField, context.getAllocator());
+    AllocationHelper.allocate(summaryVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
+
+
+    container.add(fragmentIdVector);
+    container.add(summaryVector);
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+    fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes());
+    fragmentIdVector.getMutator().setValueCount(1);
+    summaryVector.getMutator().setSafe(0, counter);
+    summaryVector.getMutator().setValueCount(1);
+
+    container.setRecordCount(1);
+    processed = true;
+
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
+  protected void setupNewSchema() throws Exception {
+    try {
+      // update the schema in RecordWriter
+      recordWriter.updateSchema(incoming.getSchema());
+    } catch(IOException ex) {
+      throw new RuntimeException("Failed to update schema in RecordWriter", ex);
+    }
+
+    eventBasedRecordWriter = new EventBasedRecordWriter(incoming.getSchema(),
+        new RecordValueAccessor(incoming), recordWriter);
+  }
+
+  @Override
+  public void cleanup() {
+    super.cleanup();
+    incoming.cleanup();
+    try {
+      if (recordWriter != null) {
+        recordWriter.cleanup();
+      }
+    } catch(IOException ex) {
+      throw new RuntimeException("Failed to close RecordWriter", ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterBatchCreator.java
deleted file mode 100644
index 4d7baad..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterBatchCreator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.physical.impl.writer;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.Writer;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
-
-import java.util.List;
-
-public class WriterBatchCreator implements BatchCreator<Writer> {
-
-  @Override
-  public RecordBatch getBatch(FragmentContext context, Writer config, List<RecordBatch> children)
-      throws ExecutionSetupException {
-    return new WriterRecordBatch(config, children.iterator().next(), context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterRecordBatch.java
deleted file mode 100644
index 81b2f5c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/writer/WriterRecordBatch.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.physical.impl.writer;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.Writer;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.RecordValueAccessor;
-import org.apache.drill.exec.store.EventBasedRecordWriter;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.VarCharVector;
-
-import java.io.IOException;
-
-/* Write the given RecordBatch to the given file in specified format. */
-public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriterRecordBatch.class);
-
-  private EventBasedRecordWriter eventBasedRecordWriter;
-  private RecordWriter recordWriter;
-  private int counter = 0;
-  private final RecordBatch incoming;
-  private boolean first = true;
-  private boolean processed = false;
-  private String fragmentUniqueId;
-
-  public WriterRecordBatch(Writer pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
-    super(pop, context);
-    this.incoming = incoming;
-
-    FragmentHandle handle = context.getHandle();
-    fragmentUniqueId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
-    try {
-      recordWriter = pop.getCreateTableEntry().getRecordWriter(fragmentUniqueId);
-    } catch(IOException ex) {
-      throw new RuntimeException("Failed to create RecordWriter", ex);
-    }
-  }
-
-  @Override
-  public int getRecordCount() {
-    return container.getRecordCount();
-  }
-
-  @Override
-  protected void killIncoming() {
-    incoming.kill();
-  }
-
-  @Override
-  public IterOutcome next() {
-    if(processed) {
-      // if the upstream record batch is already processed and next() called by
-      // downstream then return NONE to indicate completion
-      return IterOutcome.NONE;
-    }
-
-    // process the complete upstream in one next() call
-    IterOutcome upstream;
-    do {
-      upstream = incoming.next();
-      if(first && upstream == IterOutcome.OK)
-        upstream = IterOutcome.OK_NEW_SCHEMA;
-      first = false;
-
-      switch(upstream) {
-      case NOT_YET:
-      case NONE:
-      case STOP:
-        cleanup();
-        if (upstream == IterOutcome.STOP)
-          return upstream;
-        break;
-
-      case OK_NEW_SCHEMA:
-        try{
-          setupNewSchema();
-        }catch(Exception ex){
-          kill();
-          logger.error("Failure during query", ex);
-          context.fail(ex);
-          return IterOutcome.STOP;
-        }
-        // fall through.
-      case OK:
-        try {
-          counter += eventBasedRecordWriter.write();
-          logger.debug("Total records written so far: {}", counter);
-        } catch(IOException ex) {
-          throw new RuntimeException(ex);
-        }
-
-        for(VectorWrapper v : incoming)
-          v.getValueVector().clear();
-
-        break;
-
-      default:
-        throw new UnsupportedOperationException();
-      }
-    } while(upstream != IterOutcome.NONE);
-
-    // Create two vectors for:
-    //   1. Fragment unique id.
-    //   2. Summary: currently contains number of records written.
-    MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("fragment"), Types.required(MinorType.VARCHAR));
-    MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT));
-
-    VarCharVector fragmentIdVector = (VarCharVector) TypeHelper.getNewVector(fragmentIdField, context.getAllocator());
-    AllocationHelper.allocate(fragmentIdVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
-    BigIntVector summaryVector = (BigIntVector) TypeHelper.getNewVector(summaryField, context.getAllocator());
-    AllocationHelper.allocate(summaryVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
-
-
-    container.add(fragmentIdVector);
-    container.add(summaryVector);
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-
-    fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes());
-    fragmentIdVector.getMutator().setValueCount(1);
-    summaryVector.getMutator().setSafe(0, counter);
-    summaryVector.getMutator().setValueCount(1);
-
-    container.setRecordCount(1);
-    processed = true;
-
-    return IterOutcome.OK_NEW_SCHEMA;
-  }
-
-  protected void setupNewSchema() throws Exception {
-    try {
-      // update the schema in RecordWriter
-      recordWriter.updateSchema(incoming.getSchema());
-    } catch(IOException ex) {
-      throw new RuntimeException("Failed to update schema in RecordWriter", ex);
-    }
-
-    eventBasedRecordWriter = new EventBasedRecordWriter(incoming.getSchema(),
-        new RecordValueAccessor(incoming), recordWriter);
-  }
-
-  @Override
-  public void cleanup() {
-    super.cleanup();
-    incoming.cleanup();
-    try {
-      if (recordWriter != null) {
-        recordWriter.cleanup();
-      }
-    } catch(IOException ex) {
-      throw new RuntimeException("Failed to close RecordWriter", ex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
index 90e3a79..7f6acd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
@@ -21,7 +21,8 @@ package org.apache.drill.exec.planner.logical;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Writer;
 
 import java.io.IOException;
 
@@ -35,5 +36,5 @@ import java.io.IOException;
 })
 public interface CreateTableEntry {
 
-  RecordWriter getRecordWriter(String fragmentUniqueId) throws IOException;
+  Writer getWriter(PhysicalOperator child) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
index 78f2007..88d17a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
@@ -17,16 +17,19 @@
  */
 package org.apache.drill.exec.planner.logical;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.ImmutableMap;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.RecordWriterRegistry;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Writer;
+import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
 
 import java.io.IOException;
-import java.util.Map;
 
 /**
  * Implements <code>CreateTableEntry</code> interface to create new tables in FileSystem storage.
@@ -34,39 +37,41 @@ import java.util.Map;
 @JsonTypeName("filesystem")
 public class FileSystemCreateTableEntry implements CreateTableEntry {
 
-  public FileSystemConfig config;
-  public String format;
-  public String location;
+  private FileSystemConfig storageConfig;
+  private FormatPlugin formatPlugin;
+  private String location;
 
-  /**
-   * Create an entry.
-   * @param config Storage configuration.
-   * @param format Output format such as "csv", "parquet" etc.
-   * @param location Directory where the data files for the new table are created.
-   */
-  public FileSystemCreateTableEntry(@JsonProperty("config") FileSystemConfig config,
-                                    @JsonProperty("format") String format,
-                                    @JsonProperty("location") String location) {
-    this.config = config;
-    this.format = format;
+  @JsonCreator
+  public FileSystemCreateTableEntry(@JsonProperty("storageConfig") FileSystemConfig storageConfig,
+                                    @JsonProperty("formatConfig") FormatPluginConfig formatConfig,
+                                    @JsonProperty("location") String location,
+                                    @JacksonInject StoragePluginRegistry engineRegistry)
+      throws ExecutionSetupException {
+    this.storageConfig = storageConfig;
+    this.formatPlugin = engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     this.location = location;
   }
 
-  /**
-   * Returns an implementation of the RecordWriter which is used to write data to new table.
-   * @param prefix Fragment unique identifier to prefix to files created by RecordWriter.
-   *               This is needed to avoid fragments overwriting file in parallel overwriting others.
-   * @return A RecordWriter object.
-   * @throws IOException
-   */
-  @Override
-  public RecordWriter getRecordWriter(String prefix) throws IOException {
-    Map<String, String> options = ImmutableMap.of(
-        "location", location,
-        FileSystem.FS_DEFAULT_NAME_KEY, config.connection,
-        "prefix", prefix
-    );
+  public FileSystemCreateTableEntry(FileSystemConfig storageConfig,
+                                    FormatPlugin formatPlugin,
+                                    String location) {
+    this.storageConfig = storageConfig;
+    this.formatPlugin = formatPlugin;
+    this.location = location;
+  }
+
+  @JsonProperty("storageConfig")
+  public FileSystemConfig getStorageConfig() {
+    return storageConfig;
+  }
+
+  @JsonProperty("formatConfig")
+  public FormatPluginConfig getFormatConfig() {
+    return formatPlugin.getConfig();
+  }
 
-    return RecordWriterRegistry.get(format, options);
+  @Override
+  public Writer getWriter(PhysicalOperator child) throws IOException {
+    return formatPlugin.getWriter(child, location);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
index a1f1dec..98a42de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.Writer;
 import org.apache.drill.exec.planner.common.DrillWriterRelBase;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.eigenbase.rel.RelNode;
@@ -42,7 +41,7 @@ public class WriterPrel extends DrillWriterRelBase implements Prel {
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     Prel child = (Prel) this.getChild();
-    return  new Writer(child.getPhysicalOperator(creator), getCreateTableEntry());
+    return getCreateTableEntry().getWriter(child.getPhysicalOperator(creator));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 18b645b..bd57785 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -35,10 +35,10 @@ import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.logical.DrillRuleSets;
 import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.ExplainHandler;
 import org.apache.drill.exec.planner.sql.handlers.SetOptionHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
 import org.apache.drill.exec.planner.sql.handlers.UseSchemaHandler;
 import org.apache.drill.exec.planner.sql.parser.SqlDescribeTable;
@@ -109,7 +109,7 @@ public class DrillSqlWorker {
   public PhysicalPlan getPlan(String sql) throws SqlParseException, ValidationException, RelConversionException, IOException{
     SqlNode sqlNode = planner.parse(sql);
 
-    SqlHandler handler;
+    AbstractSqlHandler handler;
 
     // TODO: make this use path scanning or something similar.
     switch(sqlNode.getKind()){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java
new file mode 100644
index 0000000..492c60a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import java.io.IOException;
+
+import com.google.common.base.Joiner;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.RelConversionException;
+import net.hydromatic.optiq.tools.ValidationException;
+
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.eigenbase.sql.SqlNode;
+
+public abstract class AbstractSqlHandler {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSqlHandler.class);
+
+  public abstract PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException;
+
+  public static <T> T unwrap(Object o, Class<T> clazz) throws RelConversionException{
+    if(clazz.isAssignableFrom(o.getClass())){
+      return (T) o;
+    }else{
+      throw new RelConversionException(String.format("Failure trying to treat %s as type %s.", o.getClass().getSimpleName(), clazz.getSimpleName()));
+    }
+  }
+
+  /**
+   * From a given SchemaPlus return a mutable Drill schema object AbstractSchema if exists.
+   * Otherwise throw errors.
+   */
+  public static AbstractSchema getMutableDrillSchema(SchemaPlus schemaPlus) throws Exception{
+    AbstractSchema drillSchema;
+    try {
+      drillSchema = schemaPlus.unwrap(AbstractSchema.class);
+      drillSchema = drillSchema.getDefaultSchema();
+    } catch(ClassCastException e) {
+      throw new Exception("Current schema is not a Drill schema. " +
+              "Can't create new relations (tables or views) in non-Drill schemas.", e);
+    }
+
+    if (!drillSchema.isMutable())
+      throw new Exception(String.format("Current schema '%s' is not a mutable schema. " +
+          "Can't create new relations.", Joiner.on(".").join(drillSchema.getSchemaPath())));
+
+    return drillSchema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 84517ad..d107c29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -51,7 +51,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class DefaultSqlHandler extends SqlHandler{
+public class DefaultSqlHandler extends AbstractSqlHandler{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultSqlHandler.class);
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
index 5033e50..eafc8d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
@@ -29,7 +29,7 @@ import org.eigenbase.sql.SqlLiteral;
 import org.eigenbase.sql.SqlNode;
 import org.eigenbase.sql.SqlSetOption;
 
-public class SetOptionHandler extends SqlHandler{
+public class SetOptionHandler extends AbstractSqlHandler{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
 
   QueryContext context;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandler.java
deleted file mode 100644
index 1b23c1e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandler.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.sql.handlers;
-
-import java.io.IOException;
-
-import com.google.common.base.Joiner;
-import net.hydromatic.optiq.SchemaPlus;
-import net.hydromatic.optiq.tools.RelConversionException;
-import net.hydromatic.optiq.tools.ValidationException;
-
-import org.apache.drill.exec.physical.PhysicalPlan;
-import org.apache.drill.exec.store.AbstractSchema;
-import org.eigenbase.sql.SqlNode;
-
-public abstract class SqlHandler {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SqlHandler.class);
-
-  public abstract PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException;
-
-  public static <T> T unwrap(Object o, Class<T> clazz) throws RelConversionException{
-    if(clazz.isAssignableFrom(o.getClass())){
-      return (T) o;
-    }else{
-      throw new RelConversionException(String.format("Failure trying to treat %s as type %s.", o.getClass().getSimpleName(), clazz.getSimpleName()));
-    }
-  }
-
-  /**
-   * From a given SchemaPlus return a mutable Drill schema object AbstractSchema if exists.
-   * Otherwise throw errors.
-   */
-  public static AbstractSchema getMutableDrillSchema(SchemaPlus schemaPlus) throws Exception{
-    AbstractSchema drillSchema;
-    try {
-      drillSchema = schemaPlus.unwrap(AbstractSchema.class);
-      drillSchema = drillSchema.getDefaultSchema();
-    } catch(ClassCastException e) {
-      throw new Exception("Current schema is not a Drill schema. " +
-              "Can't create new relations (tables or views) in non-Drill schemas.", e);
-    }
-
-    if (!drillSchema.isMutable())
-      throw new Exception(String.format("Current schema '%s' is not a mutable schema. " +
-          "Can't create new relations.", Joiner.on(".").join(drillSchema.getSchemaPath())));
-
-    return drillSchema;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
index 5b12259..9b23e23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.parser.SqlUseSchema;
 import org.eigenbase.sql.SqlNode;
 
-public class UseSchemaHandler extends SqlHandler{
+public class UseSchemaHandler extends AbstractSqlHandler{
   QueryContext context;
 
   public UseSchemaHandler(QueryContext context) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
index 5c221e7..babe015 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
@@ -38,7 +38,7 @@ import org.eigenbase.rel.RelNode;
 import org.eigenbase.reltype.RelDataType;
 import org.eigenbase.sql.SqlNode;
 
-public abstract class ViewHandler extends SqlHandler{
+public abstract class ViewHandler extends AbstractSqlHandler{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ViewHandler.class);
 
   protected Planner planner;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
index 32f57b4..5fa592a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.planner.sql.parser;
 import net.hydromatic.optiq.tools.Planner;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.eigenbase.sql.SqlCall;
 import org.eigenbase.sql.parser.SqlParserPos;
 
@@ -33,7 +33,7 @@ public abstract class DrillSqlCall extends SqlCall {
     super(pos);
   }
 
-  public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+  public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
     return new DefaultSqlHandler(planner, context);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
index ed394d7..c29296b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
@@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableList;
 import net.hydromatic.optiq.tools.Planner;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.sql.handlers.CreateTableHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.eigenbase.sql.*;
 import org.eigenbase.sql.parser.SqlParserPos;
 
@@ -72,7 +72,7 @@ public class SqlCreateTable extends DrillSqlCall {
   }
 
   @Override
-  public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+  public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
     return new CreateTableHandler(planner, context);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
index b124e1d..68176ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.ImmutableList;
 import net.hydromatic.optiq.tools.Planner;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.ViewHandler;
 import org.eigenbase.sql.*;
 import org.eigenbase.sql.parser.SqlParserPos;
@@ -79,7 +79,7 @@ public class SqlCreateView extends DrillSqlCall {
   }
 
   @Override
-  public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+  public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
     return new ViewHandler.CreateView(planner, context);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
index 0a666de..30c7e43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
 import net.hydromatic.optiq.tools.Planner;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.sql.handlers.DescribeTableHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.eigenbase.sql.*;
 import org.eigenbase.sql.parser.SqlParserPos;
 
@@ -71,7 +71,7 @@ public class SqlDescribeTable extends DrillSqlCall {
   }
 
   @Override
-  public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+  public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
     return new DescribeTableHandler(planner, context);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
index 8ba7edd..7afe428 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.planner.sql.parser;
 import com.google.common.collect.ImmutableList;
 import net.hydromatic.optiq.tools.Planner;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.ViewHandler.DropView;
 import org.eigenbase.sql.*;
 import org.eigenbase.sql.parser.SqlParserPos;
@@ -55,7 +55,7 @@ public class SqlDropView extends DrillSqlCall {
   }
 
   @Override
-  public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+  public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
     return new DropView(context);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
index b7fba06..f882ba9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
@@ -20,9 +20,9 @@ package org.apache.drill.exec.planner.sql.parser;
 import com.google.common.collect.Lists;
 import net.hydromatic.optiq.tools.Planner;
 import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.ShowFileHandler;
 import org.apache.drill.exec.planner.sql.handlers.ShowTablesHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
 import org.eigenbase.sql.*;
 import org.eigenbase.sql.parser.SqlParserPos;
 
@@ -64,7 +64,7 @@ public class SqlShowFiles extends DrillSqlCall {
   }
 
   @Override
-  public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+  public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
     return new ShowFileHandler(planner, context);
   }
   public SqlIdentifier getDb() { return db; }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
index aa212a7..34695d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
 import net.hydromatic.optiq.tools.Planner;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.sql.handlers.ShowSchemasHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.eigenbase.sql.*;
 import org.eigenbase.sql.parser.SqlParserPos;
 
@@ -70,7 +70,7 @@ public class SqlShowSchemas extends DrillSqlCall {
   }
 
   @Override
-  public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+  public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
     return new ShowSchemasHandler(planner, context);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
index e3be378..26d4fa2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
 import net.hydromatic.optiq.tools.Planner;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.sql.handlers.ShowTablesHandler;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.eigenbase.sql.*;
 import org.eigenbase.sql.parser.SqlParserPos;
 
@@ -74,7 +74,7 @@ public class SqlShowTables extends DrillSqlCall {
   }
 
   @Override
-  public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+  public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
     return new ShowTablesHandler(planner, context);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
index 92d3aff..42a3914 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.planner.sql.parser;
 import com.google.common.collect.ImmutableList;
 import net.hydromatic.optiq.tools.Planner;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.planner.sql.handlers.SqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.UseSchemaHandler;
 import org.eigenbase.sql.*;
 import org.eigenbase.sql.parser.SqlParserPos;
@@ -59,7 +59,7 @@ public class SqlUseSchema extends DrillSqlCall {
   }
 
   @Override
-  public SqlHandler getSqlHandler(Planner planner, QueryContext context) {
+  public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
     return new UseSchemaHandler(context);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java
deleted file mode 100644
index 8c79fed..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordWriterRegistry.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-import com.google.common.collect.Maps;
-import org.apache.drill.common.util.PathScanner;
-import org.apache.drill.exec.store.writer.RecordWriterTemplate;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-public class RecordWriterRegistry {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordWriterRegistry.class);
-
-  // Contains mapping of "format" to RecordWriter implementation class.
-  private static Map<String, Class<? extends RecordWriter>> formatRegistry;
-
-  // Contains mapping of RecordWriter class to standard constructor that is used to instantiate the RecordWriter object.
-  private static Map<Class<? extends RecordWriter>, Constructor<? extends RecordWriter>> constructorMap;
-
-  static {
-    formatRegistry = Maps.newHashMap();
-    constructorMap = Maps.newHashMap();
-
-    Class<?>[] rwc = PathScanner.scanForImplementationsArr(RecordWriter.class, null);
-
-    for(Class<?> clazz : rwc) {
-      RecordWriterTemplate template = clazz.getAnnotation(RecordWriterTemplate.class);
-      if(template == null){
-        logger.warn("{} doesn't have {} annotation. Skipping.", clazz.getCanonicalName(), RecordWriterTemplate.class);
-        continue;
-      }
-
-      if (template.format() == null || template.format().isEmpty()) {
-        logger.warn("{} annotation doesn't have valid format field. Skipping.", RecordWriterTemplate.class);
-        continue;
-      }
-
-      // Find the standard empty parameter constructor and store it in map.
-      Constructor<?> validConstructor = null;
-      for(Constructor<?> c : clazz.getConstructors()) {
-        if (c.getParameterTypes().length == 0) {
-          validConstructor = c;
-          break;
-        }
-      }
-
-      if (validConstructor != null) {
-        formatRegistry.put(template.format(), (Class<? extends RecordWriter>)clazz);
-        constructorMap.put((Class<? extends RecordWriter>)clazz, (Constructor<? extends RecordWriter>)validConstructor);
-      } else {
-        logger.info("Skipping RecordWriter class '{}' since it doesn't implement a constructor [{}()]",
-            clazz.getCanonicalName(), clazz.getName());
-      }
-    }
-  }
-
-  public static RecordWriter get(String format, Map<String, String> options) throws IOException {
-
-    if (formatRegistry.containsKey(format)) {
-      try {
-        RecordWriter writer = constructorMap.get(formatRegistry.get(format)).newInstance();
-        writer.init(options);
-        return writer;
-      } catch(Exception e) {
-        logger.debug("Failed to create RecordWriter. Received format: {}, options: {}", format, options, e);
-        throw new IOException(
-            String.format("Failed to create RecordWriter for format '%s' with options '%s'", format, options), e);
-      }
-    }
-
-    logger.error("Unknown format '{}' received", format);
-    throw new IOException(String.format("Unknown format '%s' received", format));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 6254dfb..2d1a9ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.WorkspaceConfig;
 
 @JsonTypeName("file")
 public class FileSystemConfig implements StoragePluginConfig{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 6ddc84a..39d71e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import net.hydromatic.optiq.Schema;
 import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.common.JSONOptions;
@@ -30,10 +29,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.logical.WorkspaceConfig;
-import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.DrillUser;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 0314dc9..28496eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -28,6 +28,7 @@ import net.hydromatic.optiq.Schema;
 import net.hydromatic.optiq.SchemaPlus;
 
 import net.hydromatic.optiq.Table;
+import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.rpc.user.DrillUser;
 import org.apache.drill.exec.rpc.user.UserSession;
@@ -124,6 +125,11 @@ public class FileSystemSchemaFactory implements SchemaFactory{
     }
 
     @Override
+    public CreateTableEntry createNewTable(String tableName) {
+      return defaultSchema.createNewTable(tableName);
+    }
+
+    @Override
     public AbstractSchema getDefaultSchema() {
       return defaultSchema;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 788e7ac..20ea0a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -25,6 +25,8 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
@@ -40,6 +42,8 @@ public interface FormatPlugin {
   
   public FormatMatcher getMatcher();
 
+  public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException;
+
   public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException;
 
   public Set<StoragePluginOptimizerRule> getOptimizerRules();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
new file mode 100644
index 0000000..4e7fb8f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Stores the workspace related config. A workspace has:
+ *  - location which is a path.
+ *  - writable flag to indicate whether the location supports creating new tables.
+ *  - default storage format for new tables created in this workspace.
+ */
+public class WorkspaceConfig {
+
+  /** Default workspace is a root directory which supports read, but not write. */
+  public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null);
+
+  private final String location;
+  private final boolean writable;
+  private final String storageformat;
+
+  public WorkspaceConfig(@JsonProperty("location") String location,
+                         @JsonProperty("writable") boolean writable,
+                         @JsonProperty("storageformat") String storageformat) {
+    this.location = location;
+    this.writable = writable;
+    this.storageformat = storageformat;
+  }
+
+  public String getLocation() {
+    return location;
+  }
+
+  public boolean isWritable() {
+    return writable;
+  }
+
+  @JsonProperty("storageformat")
+  public String getStorageFormat() {
+    return storageformat;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this)
+      return true;
+
+    if (obj == null || !(obj instanceof WorkspaceConfig)) {
+      return false;
+    }
+
+    WorkspaceConfig that = (WorkspaceConfig) obj;
+    return ((this.location == null && that.location == null) || this.location.equals(that.location)) &&
+        this.writable == that.writable &&
+        ((this.storageformat == null && that.storageformat == null) || this.storageformat.equals(that.storageformat));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index d904297..df73ea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -24,10 +24,8 @@ import java.util.Set;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.Table;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.WorkspaceConfig;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
@@ -36,7 +34,6 @@ import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-
 import org.apache.hadoop.fs.Path;
 
 public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
@@ -114,7 +111,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
   public class WorkspaceSchema extends AbstractSchema implements HasFileSystemSchema {
 
     private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<String, DrillTable>(WorkspaceSchemaFactory.this);
-    private final UserSession session;
+    private UserSession session;
 
     public WorkspaceSchema(List<String> parentSchemaPath, String name, UserSession session) {
       super(parentSchemaPath, name);
@@ -140,14 +137,22 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
     }
 
     @Override
-    public CreateTableEntry createNewTable(String tableName) {
-      return new FileSystemCreateTableEntry((FileSystemConfig)plugin.getConfig(), config.getStorageFormat(),
-          config.getLocation() + Path.SEPARATOR + tableName);
+    public DrillFileSystem getFS() {
+      return fs;
     }
 
     @Override
-    public DrillFileSystem getFS() {
-      return fs;
+    public CreateTableEntry createNewTable(String tableName) {
+      FormatPlugin formatPlugin = plugin.getFormatPlugin(config.getStorageFormat());
+      if (formatPlugin == null)
+        throw new UnsupportedOperationException(
+          String.format("Unsupported format '%s' in workspace '%s'", config.getStorageFormat(),
+              Joiner.on(".").join(getSchemaPath())));
+
+      return new FileSystemCreateTableEntry(
+          (FileSystemConfig) plugin.getConfig(),
+          plugin.getFormatPlugin(config.getStorageFormat()),
+          config.getLocation() + Path.SEPARATOR + tableName);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java
deleted file mode 100644
index d79e542..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs.easy;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
-
-public class EasyBatchCreator implements BatchCreator<EasySubScan>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyBatchCreator.class);
-
-  @Override
-  public RecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
-      throws ExecutionSetupException {
-    assert children == null || children.isEmpty();
-    return config.getFormatPlugin().getBatch(context, config);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 67502ef..2965e79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -31,10 +31,14 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.WriterRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
 import org.apache.drill.exec.store.dfs.FileSelection;
@@ -108,8 +112,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
   public abstract RecordReader getRecordReader(FragmentContext context, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException;
 
-  
-  RecordBatch getBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+  RecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
     String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
     List<SchemaPath> columns = scan.getColumns();
     List<RecordReader> readers = Lists.newArrayList();
@@ -155,7 +158,23 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
     return new ScanBatch(scan, context, readers.iterator(), partitionColumns, selectedPartitionColumns);
   }
-  
+
+  public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;
+
+  public RecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, EasyWriter writer)
+      throws ExecutionSetupException {
+    try {
+      return new WriterRecordBatch(writer, incoming, context, getRecordWriter(context, writer));
+    } catch(IOException e) {
+      throw new ExecutionSetupException(String.format("Failed to create the WriterRecordBatch. %s", e.getMessage()), e);
+    }
+  }
+
+  @Override
+  public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException {
+    return new EasyWriter(child, location, this);
+  }
+
   @Override
   public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException {
     return new EasyGroupScan(selection, this, null, selection.selectionRoot);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
new file mode 100644
index 0000000..ac0d2e7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+public class EasyReaderBatchCreator implements BatchCreator<EasySubScan>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyReaderBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    assert children == null || children.isEmpty();
+    return config.getFormatPlugin().getReaderBatch(context, config);
+  }
+}