You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/12 01:08:49 UTC

[2/8] DRILL-679: Support create table as query (CTAS) (contd.).

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
new file mode 100644
index 0000000..864ae48
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("fs-writer")
+public class EasyWriter extends AbstractWriter {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriter.class);
+
+  private final String location;
+  private final EasyFormatPlugin<?> formatPlugin;
+
+  @JsonCreator
+  public EasyWriter(
+      @JsonProperty("child") PhysicalOperator child,
+      @JsonProperty("location") String location,
+      @JsonProperty("storage") StoragePluginConfig storageConfig,
+      @JsonProperty("format") FormatPluginConfig formatConfig,
+      @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
+
+    super(child);
+    this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
+    Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
+    this.location = location;
+  }
+
+  public EasyWriter(PhysicalOperator child,
+                         String location,
+                         EasyFormatPlugin<?> formatPlugin) {
+
+    super(child);
+    this.formatPlugin = formatPlugin;
+    this.location = location;
+  }
+
+  @JsonProperty("location")
+  public String getLocation() {
+    return location;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getStorageConfig(){
+    return formatPlugin.getStorageConfig();
+  }
+
+  @JsonProperty("format")
+  public FormatPluginConfig getFormatConfig(){
+    return formatPlugin.getConfig();
+  }
+
+  @JsonIgnore
+  public EasyFormatPlugin getFormatPlugin(){
+    return formatPlugin;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new EasyWriter(child, location, formatPlugin);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    // TODO:
+    return new OperatorCost(1,1,1,1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
new file mode 100644
index 0000000..c91ceba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+public class EasyWriterBatchCreator implements BatchCreator<EasyWriter>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriterBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    assert children != null && children.size() == 1;
+    return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 8bdb1ee..872052c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.easy.json;
 
+import java.io.IOException;
 import java.util.List;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -29,7 +30,9 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
@@ -52,6 +55,11 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
     return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
   }
 
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
+    throw new UnsupportedOperationException("Json Writer is not supported currently.");
+  }
+
   @JsonTypeName("json")
   public static class JSONFormatConfig implements FormatPluginConfig {
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 0dbed89..f6cc58e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -17,34 +17,37 @@
  */
 package org.apache.drill.exec.store.easy.text;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Maps;
 import com.google.common.base.Preconditions;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.drill.exec.store.text.DrillTextRecordWriter;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapred.FileSplit;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
 
@@ -71,6 +74,27 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project? 
   }
 
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
+    Map<String, String> options = Maps.newHashMap();
+
+    options.put("location", writer.getLocation());
+
+    FragmentHandle handle = context.getHandle();
+    String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
+    options.put("prefix", fragmentId);
+
+    options.put("separator", ((TextFormatConfig)getConfig()).getDelimiter());
+    options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)writer.getStorageConfig()).connection);
+
+    options.put("extension", ((TextFormatConfig)getConfig()).getExtensions().get(0));
+
+    RecordWriter recordWriter = new DrillTextRecordWriter();
+    recordWriter.init(options);
+
+    return recordWriter;
+  }
+
   @JsonTypeName("text")
   public static class TextFormatConfig implements FormatPluginConfig {
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index ec6456b..a10d30f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -24,6 +24,8 @@ import java.util.regex.Pattern;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
@@ -101,6 +103,11 @@ public class ParquetFormatPlugin implements FormatPlugin{
   }
 
   @Override
+  public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException {
+    throw new UnsupportedOperationException("Parquet Writer is not supported currently.");
+  }
+
+  @Override
   public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
     return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
new file mode 100644
index 0000000..b6840f8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.text;
+
+import com.google.common.base.Joiner;
+import org.apache.drill.exec.store.StringOutputRecordWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Map;
+
+public class DrillTextRecordWriter extends StringOutputRecordWriter {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class);
+
+  private String location;
+  private String prefix;
+
+  private String fieldDelimiter;
+  private String extension;
+
+  private static String eol = System.getProperty("line.separator");
+  private int index;
+  private PrintStream stream = null;
+  private FileSystem fs = null;
+
+  // Record write status
+  private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
+  private StringBuilder currentRecord; // contains the current record separated by field delimiter
+
+  @Override
+  public void init(Map<String, String> writerOptions) throws IOException {
+    this.location = writerOptions.get("location");
+    this.prefix = writerOptions.get("prefix");
+    this.fieldDelimiter = writerOptions.get("separator");
+    this.extension = writerOptions.get("extension");
+
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
+    this.fs = FileSystem.get(conf);
+
+    this.currentRecord = new StringBuilder();
+    this.index = 0;
+  }
+
+  @Override
+  public void startNewSchema(List<String> columnNames) throws IOException {
+    // wrap up the current file
+    cleanup();
+
+    // open a new file for writing data with new schema
+    Path fileName = new Path(location, prefix + "_" + index + "." + extension);
+    try {
+      DataOutputStream fos = fs.create(fileName);
+      stream = new PrintStream(fos);
+      logger.debug("Created file: {}", fileName);
+    } catch (IOException ex) {
+      logger.error("Unable to create file: " + fileName, ex);
+      throw ex;
+    }
+    index++;
+
+    stream.println(Joiner.on(fieldDelimiter).join(columnNames));
+  }
+
+  @Override
+  public void addField(int fieldId, String value) throws IOException {
+    currentRecord.append(value + fieldDelimiter);
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    if (fRecordStarted)
+      throw new IOException("Previous record is not written completely");
+
+    fRecordStarted = true;
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    if (!fRecordStarted)
+      throw new IOException("No record is in writing");
+
+    // remove the extra delimiter at the end
+    currentRecord.deleteCharAt(currentRecord.length()-fieldDelimiter.length());
+
+    stream.println(currentRecord.toString());
+
+    // reset current record status
+    currentRecord.delete(0, currentRecord.length());
+    fRecordStarted = false;
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    super.cleanup();
+    if (stream != null) {
+      stream.close();
+      stream = null;
+      logger.debug("closing file");
+    }
+  }
+
+  @Override
+  public void abort() throws IOException {
+    cleanup();
+    try {
+      fs.delete(new Path(location), true);
+    } catch (IOException ex) {
+      logger.error("Abort failed. There could be leftover output files");
+      throw ex;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java
deleted file mode 100644
index be7b873..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.writer;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.TYPE})
-public @interface RecordWriterTemplate {
-
-  /**
-   * Output format identifier used to identify the RecordWriter implementation.
-   * @return
-   */
-  String format();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java
deleted file mode 100644
index 8e7c58c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.writer.csv;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.base.Joiner;
-import org.apache.drill.exec.store.StringOutputRecordWriter;
-import org.apache.drill.exec.store.writer.RecordWriterTemplate;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-@RecordWriterTemplate(format = "csv")
-public class CSVRecordWriter extends StringOutputRecordWriter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CSVRecordWriter.class);
-
-  private String location;      // directory where to write the CSV files
-  private String prefix;        // prefix to output file names.
-  private int index;
-
-  private PrintStream stream = null;
-  private FileSystem fs = null;
-
-  // Record write status
-  private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
-  private StringBuilder currentRecord;    // contains the current record separated by commas
-
-  private static String eol = System.getProperty("line.separator");
-
-  @Override
-  public void init(Map<String, String> writerOptions) throws IOException {
-    this.location = writerOptions.get("location");
-    this.prefix = writerOptions.get("prefix");
-    this.index = 0;
-
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
-    this.fs = FileSystem.get(conf);
-
-    currentRecord = new StringBuilder();
-  }
-
-  @Override
-  public void startNewSchema(List<String> columnNames) throws IOException {
-    // wrap up the current file
-    cleanup();
-
-    // open a new file for writing data with new schema
-    Path fileName = new Path(location, prefix + "_" + index + ".csv");
-    try {
-      DataOutputStream fos = fs.create(fileName);
-      stream = new PrintStream(fos);
-      logger.debug("CSVWriter: created file: {}", fileName);
-    } catch (IOException ex) {
-      logger.error("Unable to create file: " + fileName, ex);
-      throw ex;
-    }
-    index++;
-
-    stream.println(Joiner.on(",").join(columnNames));
-  }
-
-  @Override
-  public void addField(int fieldId, String value) throws IOException {
-    currentRecord.append(value + ",");
-  }
-
-  @Override
-  public void startRecord() throws IOException {
-    if (fRecordStarted)
-      throw new IOException("Previous record is not written completely");
-
-    fRecordStarted = true;
-  }
-
-  @Override
-  public void endRecord() throws IOException {
-    if (!fRecordStarted)
-      throw new IOException("No record is in writing");
-
-    // remove the extra "," at the end
-    currentRecord.deleteCharAt(currentRecord.length()-1);
-
-    stream.println(currentRecord.toString());
-
-    // reset current record status
-    currentRecord.delete(0, currentRecord.length());
-    fRecordStarted = false;
-  }
-
-  @Override
-  public void cleanup() throws IOException {
-    super.cleanup();
-    if (stream != null) {
-      stream.close();
-      stream = null;
-      logger.debug("CSVWriter: closing file");
-    }
-  }
-
-  @Override
-  public void abort() throws IOException {
-    cleanup();
-    try {
-      fs.delete(new Path(location), true);
-    } catch (IOException ex) {
-      logger.error("Abort failed. There could be leftover output files");
-      throw ex;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
index a9a277b..698186c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.List;
@@ -38,13 +39,19 @@ import static org.junit.Assert.assertTrue;
 
 public class TestWriter extends BaseTestQuery {
 
-  @Test
-  public void testSimpleCsv() throws Exception {
-    // before executing the test deleting the existing CSV files in /tmp/csvtest
+  static FileSystem fs;
+
+  @BeforeClass
+  public static void initFs() throws Exception {
     Configuration conf = new Configuration();
     conf.set("fs.name.default", "local");
 
-    FileSystem fs = FileSystem.get(conf);
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void simpleCsv() throws Exception {
+    // before executing the test deleting the existing CSV files in /tmp/csvtest
     Path path = new Path("/tmp/csvtest");
     if (fs.exists(path)) {
       fs.delete(path, true);
@@ -80,4 +87,43 @@ public class TestWriter extends BaseTestQuery {
     }
     batchLoader.clear();
   }
+
+  @Test
+  public void simpleCTAS() throws Exception {
+    String testQuery = "USE dfs.tmp;" +
+        "CREATE TABLE simplectas AS SELECT * FROM cp.`employee.json`;";
+
+    ctasHelper("/tmp/drilltest/simplectas", testQuery, 1);
+  }
+
+  @Test
+  public void complex1CTAS() throws Exception {
+    String testQuery = "USE dfs.tmp;" +
+        "CREATE TABLE complex1ctas AS SELECT first_name, last_name, position_id FROM cp.`employee.json`;";
+
+    ctasHelper("/tmp/drilltest/complex1ctas", testQuery, 1);
+  }
+
+  @Test
+  public void complex2CTAS() throws Exception {
+    String testQuery = "USE dfs.tmp;" +
+        "CREATE TABLE complex2ctas AS SELECT CAST(`birth_date` as Timestamp) FROM cp.`employee.json` GROUP BY birth_date;";
+
+    ctasHelper("/tmp/drilltest/complex2ctas", testQuery, 3);
+  }
+
+
+  private void ctasHelper(String tableDir, String testQuery, int numExpectedFiles) throws Exception {
+    Path tableLocation = new Path(tableDir);
+    if (fs.exists(tableLocation)){
+      fs.delete(tableLocation, true);
+    }
+
+    test(testQuery);
+
+    assertTrue(fs.exists(tableLocation));
+    FileStatus[] fileStatuses = fs.globStatus(new Path(tableLocation.toString(), "*.csv"));
+    assertEquals(numExpectedFiles, fileStatuses.length);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/test/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/storage-plugins.json b/exec/java-exec/src/test/resources/storage-plugins.json
index 33f4fac..020805e 100644
--- a/exec/java-exec/src/test/resources/storage-plugins.json
+++ b/exec/java-exec/src/test/resources/storage-plugins.json
@@ -3,7 +3,18 @@
     dfs: {
       type: "file",
       connection: "file:///",
-    formats: {
+      workspaces: {
+        "home" : {
+          location: "/",
+          writable: false
+        },
+        "tmp" : {
+          location: "/tmp/drilltest",
+          writable: true,
+          storageformat: "csv"
+        }
+      },
+      formats: {
             "psv" : {
               type: "text",
               extensions: [ "tbl" ],
@@ -37,4 +48,4 @@
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
index f726ce8..ff670d5 100644
--- a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
+++ b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
@@ -14,52 +14,67 @@
        entries:[
          {records: 66000, types: [
            {name: "red", type: "INT", mode: "REQUIRED"},
-           {name: "green", type: "INT", mode: "REQUIRED"},
-           {name: "blue", type: "INT", mode: "REQUIRED"}
+           {name: "green", type: "BIGINT", mode: "OPTIONAL"},
+           {name: "blue", type: "VARCHAR", mode: "REQUIRED"}
          ]},
          {records: 66000, types: [
-           {name: "blue", type: "INT", mode: "REQUIRED"},
-           {name: "green", type: "INT", mode: "REQUIRED"},
-           {name: "red", type: "INT", mode: "REQUIRED"}
+           {name: "blue", type: "BIT", mode: "REQUIRED"},
+           {name: "green", type: "DECIMAL18", mode: "REQUIRED"},
+           {name: "red", type: "FLOAT8", mode: "OPTIONAL"}
          ]}
        ]
     }, {
-      @id:2,
+      @id: 2,
       child: 1,
-      pop:"project",
-      exprs: [
-        { ref: "col1", expr:"red" },
-        { ref: "col2", expr:"green" },
-        { ref: "col3", expr:"blue" }
-      ]
-    }, {
-      @id: 3,
-      child: 2,
-      pop: "writer",
-      createTableEntry: {
-        "type" : "filesystem",
-        "config" : {
-          "type" : "file",
-          "connection" : "file:///",
-          "workspaces" : {
-            "root" : {
-              "location" : "/",
-              "writable" : false,
-              "storageformat" : null
-            },
-            "tmp" : {
-              "location" : "/tmp",
-              "writable" : true,
-              "storageformat" : "csv"
-            }
+      pop: "fs-writer",
+      "location" : "/tmp/csvtest",
+      "storage" : {
+        "type" : "file",
+        "connection" : "file:///",
+        "workspaces" : {
+          "root" : {
+            "location" : "/",
+            "writable" : false,
+            "storageformat" : null
+          },
+          "tmp" : {
+            "location" : "/tmp",
+            "writable" : true,
+            "storageformat" : "csv"
           }
         },
-        "format" : "csv",
-        "location" : "/tmp/csvtest"
+        "formats" : {
+          "psv" : {
+            "type" : "text",
+            "extensions" : [ "tbl" ],
+            "delimiter" : "|"
+          },
+          "csv" : {
+            "type" : "text",
+            "extensions" : [ "csv" ],
+            "delimiter" : ","
+          },
+          "tsv" : {
+            "type" : "text",
+            "extensions" : [ "tsv" ],
+            "delimiter" : "\t"
+          },
+          "parquet" : {
+            "type" : "parquet"
+          },
+          "json" : {
+            "type" : "json"
+          }
+        }
+      },
+      "format" : {
+        "type" : "text",
+        "extensions" : [ "csv" ],
+        "delimiter" : ","
       }
     }, {
-      @id: 4,
-      child: 3,
+      @id: 3,
+      child: 2,
       pop: "screen"
     }
   ]