You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/12 01:08:49 UTC
[2/8] DRILL-679: Support create table as query (CTAS) (contd.).
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
new file mode 100644
index 0000000..864ae48
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("fs-writer")
+public class EasyWriter extends AbstractWriter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriter.class);
+
+ private final String location;
+ private final EasyFormatPlugin<?> formatPlugin;
+
+ @JsonCreator
+ public EasyWriter(
+ @JsonProperty("child") PhysicalOperator child,
+ @JsonProperty("location") String location,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("format") FormatPluginConfig formatConfig,
+ @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
+
+ super(child);
+ this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
+ Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
+ this.location = location;
+ }
+
+ public EasyWriter(PhysicalOperator child,
+ String location,
+ EasyFormatPlugin<?> formatPlugin) {
+
+ super(child);
+ this.formatPlugin = formatPlugin;
+ this.location = location;
+ }
+
+ @JsonProperty("location")
+ public String getLocation() {
+ return location;
+ }
+
+ @JsonProperty("storage")
+ public StoragePluginConfig getStorageConfig(){
+ return formatPlugin.getStorageConfig();
+ }
+
+ @JsonProperty("format")
+ public FormatPluginConfig getFormatConfig(){
+ return formatPlugin.getConfig();
+ }
+
+ @JsonIgnore
+ public EasyFormatPlugin getFormatPlugin(){
+ return formatPlugin;
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new EasyWriter(child, location, formatPlugin);
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ // TODO:
+ return new OperatorCost(1,1,1,1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
new file mode 100644
index 0000000..c91ceba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+public class EasyWriterBatchCreator implements BatchCreator<EasyWriter>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriterBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ assert children != null && children.size() == 1;
+ return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 8bdb1ee..872052c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.easy.json;
+import java.io.IOException;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -29,7 +30,9 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
@@ -52,6 +55,11 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
}
+ @Override
+ public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
+ throw new UnsupportedOperationException("Json Writer is not supported currently.");
+ }
+
@JsonTypeName("json")
public static class JSONFormatConfig implements FormatPluginConfig {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 0dbed89..f6cc58e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -17,34 +17,37 @@
*/
package org.apache.drill.exec.store.easy.text;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Maps;
import com.google.common.base.Preconditions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.drill.exec.store.text.DrillTextRecordReader;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.drill.exec.store.text.DrillTextRecordWriter;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
@@ -71,6 +74,27 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project?
}
+ @Override
+ public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
+ Map<String, String> options = Maps.newHashMap();
+
+ options.put("location", writer.getLocation());
+
+ FragmentHandle handle = context.getHandle();
+ String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
+ options.put("prefix", fragmentId);
+
+ options.put("separator", ((TextFormatConfig)getConfig()).getDelimiter());
+ options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)writer.getStorageConfig()).connection);
+
+ options.put("extension", ((TextFormatConfig)getConfig()).getExtensions().get(0));
+
+ RecordWriter recordWriter = new DrillTextRecordWriter();
+ recordWriter.init(options);
+
+ return recordWriter;
+ }
+
@JsonTypeName("text")
public static class TextFormatConfig implements FormatPluginConfig {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index ec6456b..a10d30f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -24,6 +24,8 @@ import java.util.regex.Pattern;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
@@ -101,6 +103,11 @@ public class ParquetFormatPlugin implements FormatPlugin{
}
@Override
+ public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException {
+ throw new UnsupportedOperationException("Parquet Writer is not supported currently.");
+ }
+
+ @Override
public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
new file mode 100644
index 0000000..b6840f8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.text;
+
+import com.google.common.base.Joiner;
+import org.apache.drill.exec.store.StringOutputRecordWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Map;
+
+public class DrillTextRecordWriter extends StringOutputRecordWriter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class);
+
+ private String location;
+ private String prefix;
+
+ private String fieldDelimiter;
+ private String extension;
+
+ private static String eol = System.getProperty("line.separator");
+ private int index;
+ private PrintStream stream = null;
+ private FileSystem fs = null;
+
+ // Record write status
+ private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
+ private StringBuilder currentRecord; // contains the current record separated by field delimiter
+
+ @Override
+ public void init(Map<String, String> writerOptions) throws IOException {
+ this.location = writerOptions.get("location");
+ this.prefix = writerOptions.get("prefix");
+ this.fieldDelimiter = writerOptions.get("separator");
+ this.extension = writerOptions.get("extension");
+
+ Configuration conf = new Configuration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ this.fs = FileSystem.get(conf);
+
+ this.currentRecord = new StringBuilder();
+ this.index = 0;
+ }
+
+ @Override
+ public void startNewSchema(List<String> columnNames) throws IOException {
+ // wrap up the current file
+ cleanup();
+
+ // open a new file for writing data with new schema
+ Path fileName = new Path(location, prefix + "_" + index + "." + extension);
+ try {
+ DataOutputStream fos = fs.create(fileName);
+ stream = new PrintStream(fos);
+ logger.debug("Created file: {}", fileName);
+ } catch (IOException ex) {
+ logger.error("Unable to create file: " + fileName, ex);
+ throw ex;
+ }
+ index++;
+
+ stream.println(Joiner.on(fieldDelimiter).join(columnNames));
+ }
+
+ @Override
+ public void addField(int fieldId, String value) throws IOException {
+ currentRecord.append(value + fieldDelimiter);
+ }
+
+ @Override
+ public void startRecord() throws IOException {
+ if (fRecordStarted)
+ throw new IOException("Previous record is not written completely");
+
+ fRecordStarted = true;
+ }
+
+ @Override
+ public void endRecord() throws IOException {
+ if (!fRecordStarted)
+ throw new IOException("No record is in writing");
+
+ // remove the extra delimiter at the end
+ currentRecord.deleteCharAt(currentRecord.length()-fieldDelimiter.length());
+
+ stream.println(currentRecord.toString());
+
+ // reset current record status
+ currentRecord.delete(0, currentRecord.length());
+ fRecordStarted = false;
+ }
+
+ @Override
+ public void cleanup() throws IOException {
+ super.cleanup();
+ if (stream != null) {
+ stream.close();
+ stream = null;
+ logger.debug("closing file");
+ }
+ }
+
+ @Override
+ public void abort() throws IOException {
+ cleanup();
+ try {
+ fs.delete(new Path(location), true);
+ } catch (IOException ex) {
+ logger.error("Abort failed. There could be leftover output files");
+ throw ex;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java
deleted file mode 100644
index be7b873..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/RecordWriterTemplate.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.writer;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.TYPE})
-public @interface RecordWriterTemplate {
-
- /**
- * Output format identifier used to identify the RecordWriter implementation.
- * @return
- */
- String format();
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java
deleted file mode 100644
index 8e7c58c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/writer/csv/CSVRecordWriter.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.writer.csv;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.base.Joiner;
-import org.apache.drill.exec.store.StringOutputRecordWriter;
-import org.apache.drill.exec.store.writer.RecordWriterTemplate;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-@RecordWriterTemplate(format = "csv")
-public class CSVRecordWriter extends StringOutputRecordWriter {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CSVRecordWriter.class);
-
- private String location; // directory where to write the CSV files
- private String prefix; // prefix to output file names.
- private int index;
-
- private PrintStream stream = null;
- private FileSystem fs = null;
-
- // Record write status
- private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
- private StringBuilder currentRecord; // contains the current record separated by commas
-
- private static String eol = System.getProperty("line.separator");
-
- @Override
- public void init(Map<String, String> writerOptions) throws IOException {
- this.location = writerOptions.get("location");
- this.prefix = writerOptions.get("prefix");
- this.index = 0;
-
- Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
- this.fs = FileSystem.get(conf);
-
- currentRecord = new StringBuilder();
- }
-
- @Override
- public void startNewSchema(List<String> columnNames) throws IOException {
- // wrap up the current file
- cleanup();
-
- // open a new file for writing data with new schema
- Path fileName = new Path(location, prefix + "_" + index + ".csv");
- try {
- DataOutputStream fos = fs.create(fileName);
- stream = new PrintStream(fos);
- logger.debug("CSVWriter: created file: {}", fileName);
- } catch (IOException ex) {
- logger.error("Unable to create file: " + fileName, ex);
- throw ex;
- }
- index++;
-
- stream.println(Joiner.on(",").join(columnNames));
- }
-
- @Override
- public void addField(int fieldId, String value) throws IOException {
- currentRecord.append(value + ",");
- }
-
- @Override
- public void startRecord() throws IOException {
- if (fRecordStarted)
- throw new IOException("Previous record is not written completely");
-
- fRecordStarted = true;
- }
-
- @Override
- public void endRecord() throws IOException {
- if (!fRecordStarted)
- throw new IOException("No record is in writing");
-
- // remove the extra "," at the end
- currentRecord.deleteCharAt(currentRecord.length()-1);
-
- stream.println(currentRecord.toString());
-
- // reset current record status
- currentRecord.delete(0, currentRecord.length());
- fRecordStarted = false;
- }
-
- @Override
- public void cleanup() throws IOException {
- super.cleanup();
- if (stream != null) {
- stream.close();
- stream = null;
- logger.debug("CSVWriter: closing file");
- }
- }
-
- @Override
- public void abort() throws IOException {
- cleanup();
- try {
- fs.delete(new Path(location), true);
- } catch (IOException ex) {
- logger.error("Abort failed. There could be leftover output files");
- throw ex;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
index a9a277b..698186c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.util.List;
@@ -38,13 +39,19 @@ import static org.junit.Assert.assertTrue;
public class TestWriter extends BaseTestQuery {
- @Test
- public void testSimpleCsv() throws Exception {
- // before executing the test deleting the existing CSV files in /tmp/csvtest
+ static FileSystem fs;
+
+ @BeforeClass
+ public static void initFs() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.name.default", "local");
- FileSystem fs = FileSystem.get(conf);
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void simpleCsv() throws Exception {
+ // before executing the test deleting the existing CSV files in /tmp/csvtest
Path path = new Path("/tmp/csvtest");
if (fs.exists(path)) {
fs.delete(path, true);
@@ -80,4 +87,43 @@ public class TestWriter extends BaseTestQuery {
}
batchLoader.clear();
}
+
+ @Test
+ public void simpleCTAS() throws Exception {
+ String testQuery = "USE dfs.tmp;" +
+ "CREATE TABLE simplectas AS SELECT * FROM cp.`employee.json`;";
+
+ ctasHelper("/tmp/drilltest/simplectas", testQuery, 1);
+ }
+
+ @Test
+ public void complex1CTAS() throws Exception {
+ String testQuery = "USE dfs.tmp;" +
+ "CREATE TABLE complex1ctas AS SELECT first_name, last_name, position_id FROM cp.`employee.json`;";
+
+ ctasHelper("/tmp/drilltest/complex1ctas", testQuery, 1);
+ }
+
+ @Test
+ public void complex2CTAS() throws Exception {
+ String testQuery = "USE dfs.tmp;" +
+ "CREATE TABLE complex2ctas AS SELECT CAST(`birth_date` as Timestamp) FROM cp.`employee.json` GROUP BY birth_date;";
+
+ ctasHelper("/tmp/drilltest/complex2ctas", testQuery, 3);
+ }
+
+
+ private void ctasHelper(String tableDir, String testQuery, int numExpectedFiles) throws Exception {
+ Path tableLocation = new Path(tableDir);
+ if (fs.exists(tableLocation)){
+ fs.delete(tableLocation, true);
+ }
+
+ test(testQuery);
+
+ assertTrue(fs.exists(tableLocation));
+ FileStatus[] fileStatuses = fs.globStatus(new Path(tableLocation.toString(), "*.csv"));
+ assertEquals(numExpectedFiles, fileStatuses.length);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/test/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/storage-plugins.json b/exec/java-exec/src/test/resources/storage-plugins.json
index 33f4fac..020805e 100644
--- a/exec/java-exec/src/test/resources/storage-plugins.json
+++ b/exec/java-exec/src/test/resources/storage-plugins.json
@@ -3,7 +3,18 @@
dfs: {
type: "file",
connection: "file:///",
- formats: {
+ workspaces: {
+ "home" : {
+ location: "/",
+ writable: false
+ },
+ "tmp" : {
+ location: "/tmp/drilltest",
+ writable: true,
+ storageformat: "csv"
+ }
+ },
+ formats: {
"psv" : {
type: "text",
extensions: [ "tbl" ],
@@ -37,4 +48,4 @@
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b7f351e/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
index f726ce8..ff670d5 100644
--- a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
+++ b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
@@ -14,52 +14,67 @@
entries:[
{records: 66000, types: [
{name: "red", type: "INT", mode: "REQUIRED"},
- {name: "green", type: "INT", mode: "REQUIRED"},
- {name: "blue", type: "INT", mode: "REQUIRED"}
+ {name: "green", type: "BIGINT", mode: "OPTIONAL"},
+ {name: "blue", type: "VARCHAR", mode: "REQUIRED"}
]},
{records: 66000, types: [
- {name: "blue", type: "INT", mode: "REQUIRED"},
- {name: "green", type: "INT", mode: "REQUIRED"},
- {name: "red", type: "INT", mode: "REQUIRED"}
+ {name: "blue", type: "BIT", mode: "REQUIRED"},
+ {name: "green", type: "DECIMAL18", mode: "REQUIRED"},
+ {name: "red", type: "FLOAT8", mode: "OPTIONAL"}
]}
]
}, {
- @id:2,
+ @id: 2,
child: 1,
- pop:"project",
- exprs: [
- { ref: "col1", expr:"red" },
- { ref: "col2", expr:"green" },
- { ref: "col3", expr:"blue" }
- ]
- }, {
- @id: 3,
- child: 2,
- pop: "writer",
- createTableEntry: {
- "type" : "filesystem",
- "config" : {
- "type" : "file",
- "connection" : "file:///",
- "workspaces" : {
- "root" : {
- "location" : "/",
- "writable" : false,
- "storageformat" : null
- },
- "tmp" : {
- "location" : "/tmp",
- "writable" : true,
- "storageformat" : "csv"
- }
+ pop: "fs-writer",
+ "location" : "/tmp/csvtest",
+ "storage" : {
+ "type" : "file",
+ "connection" : "file:///",
+ "workspaces" : {
+ "root" : {
+ "location" : "/",
+ "writable" : false,
+ "storageformat" : null
+ },
+ "tmp" : {
+ "location" : "/tmp",
+ "writable" : true,
+ "storageformat" : "csv"
}
},
- "format" : "csv",
- "location" : "/tmp/csvtest"
+ "formats" : {
+ "psv" : {
+ "type" : "text",
+ "extensions" : [ "tbl" ],
+ "delimiter" : "|"
+ },
+ "csv" : {
+ "type" : "text",
+ "extensions" : [ "csv" ],
+ "delimiter" : ","
+ },
+ "tsv" : {
+ "type" : "text",
+ "extensions" : [ "tsv" ],
+ "delimiter" : "\t"
+ },
+ "parquet" : {
+ "type" : "parquet"
+ },
+ "json" : {
+ "type" : "json"
+ }
+ }
+ },
+ "format" : {
+ "type" : "text",
+ "extensions" : [ "csv" ],
+ "delimiter" : ","
}
}, {
- @id: 4,
- child: 3,
+ @id: 3,
+ child: 2,
pop: "screen"
}
]