You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/01/24 06:15:21 UTC
[1/5] drill git commit: DRILL-4956: Temporary tables support
Repository: drill
Updated Branches:
refs/heads/master 8a4d7a994 -> 2af709f43
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
new file mode 100644
index 0000000..a125bae
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Contains list of parameters that will be used to store path / files on file system. */
+public class StorageStrategy {
+
+ /**
+ * Primary is used for persistent tables.
+ * For directories: drwxrwxr-x (owner and group have full access, others can read and execute).
+ * For files: -rw-r--r-- (owner can read and write, group and others can read).
+ * Folders and files are not deleted on file system close.
+ */
+ public static final StorageStrategy PERSISTENT = new StorageStrategy("775", "644", false);
+
+ /**
+ * Primary is used for temporary tables.
+ * For directories: drwx------ (owner has full access, group and others have no access).
+ * For files: -rw------- (owner can read and write, group and others have no access).
+ * Folders and files are deleted on file system close.
+ */
+ public static final StorageStrategy TEMPORARY = new StorageStrategy("700", "600", true);
+
+ private final String folderPermission;
+ private final String filePermission;
+ private final boolean deleteOnExit;
+
+ @JsonCreator
+ public StorageStrategy(@JsonProperty("folderPermission") String folderPermission,
+ @JsonProperty("filePermission") String filePermission,
+ @JsonProperty("deleteOnExit") boolean deleteOnExit) {
+ this.folderPermission = folderPermission;
+ this.filePermission = filePermission;
+ this.deleteOnExit = deleteOnExit;
+ }
+
+ public String getFolderPermission() {
+ return folderPermission;
+ }
+
+ public String getFilePermission() { return filePermission; }
+
+ public boolean isDeleteOnExit() {
+ return deleteOnExit;
+ }
+
+ /**
+ * Creates passed path on appropriate file system.
+ * Before creation checks which parent directories do not exists.
+ * Applies storage strategy rules to all newly created directories.
+ * Will return first created path or null already existed.
+ *
+ * Case 1: /a/b -> already exists, attempt to create /a/b/c/d
+ * Will create path and return /a/b/c.
+ * Case 2: /a/b/c -> already exists, attempt to create /a/b/c/d
+ * Will create path and return /a/b/c/d.
+ * Case 3: /a/b/c/d -> already exists, will return null.
+ *
+ * @param fs file system where file should be located
+ * @param path location path
+ * @return first created parent path or file
+ * @throws IOException is thrown in case of problems while creating path, setting permission
+ * or adding path to delete on exit list
+ */
+ public Path createPathAndApply(FileSystem fs, Path path) throws IOException {
+ List<Path> locations = getNonExistentLocations(fs, path);
+ if (locations.isEmpty()) {
+ return null;
+ }
+ fs.mkdirs(path);
+ for (Path location : locations) {
+ applyStrategy(fs, location, folderPermission, deleteOnExit);
+ }
+ return locations.get(locations.size() - 1);
+ }
+
+ /**
+ * Creates passed file on appropriate file system.
+ * Before creation checks which parent directories do not exists.
+ * Applies storage strategy rules to all newly created directories and file.
+ * Will return first created parent path or file if no new parent paths created.
+ *
+ * Case 1: /a/b -> already exists, attempt to create /a/b/c/some_file.txt
+ * Will create file and return /a/b/c.
+ * Case 2: /a/b/c -> already exists, attempt to create /a/b/c/some_file.txt
+ * Will create file and return /a/b/c/some_file.txt.
+ * Case 3: /a/b/c/some_file.txt -> already exists, will fail.
+ *
+ * @param fs file system where file should be located
+ * @param file file path
+ * @return first created parent path or file
+ * @throws IOException is thrown in case of problems while creating path, setting permission
+ * or adding path to delete on exit list
+ */
+ public Path createFileAndApply(FileSystem fs, Path file) throws IOException {
+ List<Path> locations = getNonExistentLocations(fs, file.getParent());
+ if (!fs.createNewFile(file)) {
+ throw new IOException(String.format("File [%s] already exists on file system [%s].",
+ file.toUri().getPath(), fs.getUri()));
+ }
+ applyToFile(fs, file);
+
+ if (locations.isEmpty()) {
+ return file;
+ }
+
+ for (Path location : locations) {
+ applyStrategy(fs, location, folderPermission, deleteOnExit);
+ }
+ return locations.get(locations.size() - 1);
+ }
+
+ /**
+ * Applies storage strategy to file:
+ * sets permission and adds to file system delete on exit list if needed.
+ *
+ * @param fs file system
+ * @param file path to file
+ * @throws IOException is thrown in case of problems while setting permission
+ * or adding file to delete on exit list
+ */
+ public void applyToFile(FileSystem fs, Path file) throws IOException {
+ applyStrategy(fs, file, filePermission, deleteOnExit);
+ }
+
+ /**
+ * Returns list of parent locations that do not exist, including initial location.
+ * First in the list will be initial location,
+ * last in the list will be last parent location that does not exist.
+ * If all locations exist, empty list will be returned.
+ *
+ * Case 1: if /a/b exists and passed location is /a/b/c/d,
+ * will return list with two elements: 0 -> /a/b/c/d, 1 -> /a/b/c
+ * Case 2: if /a/b exists and passed location is /a/b, will return empty list.
+ *
+ * @param fs file system where locations should be located
+ * @param path location path
+ * @return list of locations that do not exist
+ * @throws IOException in case of troubles accessing file system
+ */
+ private List<Path> getNonExistentLocations(FileSystem fs, Path path) throws IOException {
+ List<Path> locations = Lists.newArrayList();
+ Path starting = path;
+ while (starting != null && !fs.exists(starting)) {
+ locations.add(starting);
+ starting = starting.getParent();
+ }
+ return locations;
+ }
+
+ /**
+ * Applies storage strategy to passed path on passed file system.
+ * Sets appropriate permission
+ * and adds to file system delete on exit list if needed.
+ *
+ * @param fs file system where path is located
+ * @param path path location
+ * @param permission permission to be applied
+ * @param deleteOnExit if to delete path on exit
+ * @throws IOException is thrown in case of problems while setting permission
+ * or adding path to delete on exit list
+ */
+ private void applyStrategy(FileSystem fs, Path path, String permission, boolean deleteOnExit) throws IOException {
+ fs.setPermission(path, new FsPermission(permission));
+ if (deleteOnExit) {
+ fs.deleteOnExit(path);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
index e502e99..2110f38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -63,8 +63,8 @@ public class SubSchemaWrapper extends AbstractSchema {
}
@Override
- public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
- return innerSchema.createNewTable(tableName, partitionColumns);
+ public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
+ return innerSchema.createNewTable(tableName, partitionColumns, storageStrategy);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index e0f5438..3a89591 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -288,6 +288,9 @@ public class FileSelection {
}
final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().toString());
logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
+ if (fileSel == null) {
+ return null;
+ }
fileSel.setHadWildcard(hasWildcard);
return fileSel;
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 526dfb1..e3e01c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,12 +24,10 @@ import java.util.Map;
import java.util.Set;
import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.PartitionNotFoundException;
@@ -150,8 +148,8 @@ public class FileSystemSchemaFactory implements SchemaFactory{
}
@Override
- public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
- return defaultSchema.createNewTable(tableName, partitionColumns);
+ public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
+ return defaultSchema.createNewTable(tableName, partitionColumns, storageStrategy);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index dac313b..8416ed8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -53,13 +53,13 @@ import org.apache.drill.exec.dotdrill.DotDrillFile;
import org.apache.drill.exec.dotdrill.DotDrillType;
import org.apache.drill.exec.dotdrill.DotDrillUtil;
import org.apache.drill.exec.dotdrill.View;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
import org.apache.drill.exec.planner.logical.DrillViewTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
-import org.apache.drill.exec.planner.sql.DrillOperatorTable;
import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.PartitionNotFoundException;
@@ -522,7 +522,6 @@ public class WorkspaceSchemaFactory {
} catch (UnsupportedOperationException e) {
logger.debug("The filesystem for this workspace does not support this operation.", e);
}
-
return tables.get(tableKey);
}
@@ -540,7 +539,7 @@ public class WorkspaceSchemaFactory {
}
@Override
- public CreateTableEntry createNewTable(String tableName, List<String> partitonColumns) {
+ public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
if (formatPlugin == null) {
@@ -553,7 +552,8 @@ public class WorkspaceSchemaFactory {
(FileSystemConfig) plugin.getConfig(),
formatPlugin,
config.getLocation() + Path.SEPARATOR + tableName,
- partitonColumns);
+ partitionColumns,
+ storageStrategy);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index db22568..52ce8b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,11 +21,11 @@ import java.io.IOException;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.store.StoragePluginRegistry;
import com.fasterxml.jackson.annotation.JacksonInject;
@@ -48,6 +48,7 @@ public class EasyWriter extends AbstractWriter {
@JsonProperty("child") PhysicalOperator child,
@JsonProperty("location") String location,
@JsonProperty("partitionColumns") List<String> partitionColumns,
+ @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
@JsonProperty("storage") StoragePluginConfig storageConfig,
@JsonProperty("format") FormatPluginConfig formatConfig,
@JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
@@ -57,6 +58,7 @@ public class EasyWriter extends AbstractWriter {
Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
this.location = location;
this.partitionColumns = partitionColumns;
+ setStorageStrategy(storageStrategy);
}
public EasyWriter(PhysicalOperator child,
@@ -92,7 +94,9 @@ public class EasyWriter extends AbstractWriter {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new EasyWriter(child, location, partitionColumns, formatPlugin);
+ EasyWriter writer = new EasyWriter(child, location, partitionColumns, formatPlugin);
+ writer.setStorageStrategy(getStorageStrategy());
+ return writer;
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 30c248e..58ca95f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -83,7 +83,7 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
options.put("uglify", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY)));
options.put("skipnulls", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS)));
- RecordWriter recordWriter = new JsonRecordWriter();
+ RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy());
recordWriter.init(options);
return recordWriter;
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index f27e04c..c37da8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.store.EventBasedRecordWriter;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
@@ -46,6 +46,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class);
private static final String LINE_FEED = String.format("%n");
+ private Path cleanUpLocation;
private String location;
private String prefix;
@@ -58,11 +59,13 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
private FSDataOutputStream stream = null;
private final JsonFactory factory = new JsonFactory();
+ private final StorageStrategy storageStrategy;
// Record write status
private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
- public JsonRecordWriter(){
+ public JsonRecordWriter(StorageStrategy storageStrategy){
+ this.storageStrategy = storageStrategy == null ? StorageStrategy.PERSISTENT : storageStrategy;
}
@Override
@@ -81,7 +84,17 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
Path fileName = new Path(location, prefix + "_" + index + "." + extension);
try {
+ // json writer does not support partitions, so only one file can be created
+ // and thus only one location should be deleted in case of abort
+ // to ensure that our writer was the first to create output file,
+ // we create empty output file first and fail if file exists
+ cleanUpLocation = storageStrategy.createFileAndApply(fs, fileName);
+
+ // since empty output file will be overwritten (some file systems may restrict append option)
+ // we need to re-apply file permission
stream = fs.create(fileName);
+ storageStrategy.applyToFile(fs, fileName);
+
JsonGenerator generator = factory.createGenerator(stream).useDefaultPrettyPrinter();
if (uglify) {
generator = generator.setPrettyPrinter(new MinimalPrettyPrinter(LINE_FEED));
@@ -238,6 +251,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
@Override
public void abort() throws IOException {
+ if (cleanUpLocation != null) {
+ fs.delete(cleanUpLocation, true);
+ logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+ cleanUpLocation.toUri().getPath(), fs.getUri());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 6542ad4..a9a30e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -125,7 +125,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
options.put("extension", ((TextFormatConfig)getConfig()).getExtensions().get(0));
- RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator());
+ RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator(), writer.getStorageStrategy());
recordWriter.init(options);
return recordWriter;
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 4ee863a..a25699d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -34,6 +34,7 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
@@ -80,6 +81,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
public static final String DRILL_VERSION_PROPERTY = "drill.version";
public static final String WRITER_VERSION_PROPERTY = "drill-writer.version";
+ private final StorageStrategy storageStrategy;
private ParquetFileWriter parquetFileWriter;
private MessageType schema;
private Map<String, String> extraMetaData = new HashMap<>();
@@ -101,7 +103,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private BatchSchema batchSchema;
private Configuration conf;
+ private FileSystem fs;
private String location;
+ private List<Path> cleanUpLocations;
private String prefix;
private int index = 0;
private OperatorContext oContext;
@@ -117,6 +121,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION));
+ this.storageStrategy = writer.getStorageStrategy() == null ? StorageStrategy.PERSISTENT : writer.getStorageStrategy();
+ this.cleanUpLocations = Lists.newArrayList();
}
@Override
@@ -126,6 +132,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ fs = FileSystem.get(conf);
blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
pageSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE));
dictionaryPageSize= Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_DICT_PAGE_SIZE));
@@ -363,7 +370,19 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
// we wait until there is at least one record before creating the parquet file
if (parquetFileWriter == null) {
Path path = new Path(location, prefix + "_" + index + ".parquet");
- parquetFileWriter = new ParquetFileWriter(conf, schema, path);
+ // to ensure that our writer was the first to create output file, we create empty file first and fail if file exists
+ Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path);
+
+ // since parquet reader supports partitions, it means that several output files may be created
+ // if this writer was the one to create table folder, we store only folder and delete it with its content in case of abort
+ // if table location was created before, we store only files created by this writer and delete them in case of abort
+ addCleanUpLocation(fs, firstCreatedPath);
+
+ // since ParquetFileWriter will overwrite empty output file (append is not supported)
+ // we need to re-apply file permission
+ parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE);
+ storageStrategy.applyToFile(fs, path);
+
parquetFileWriter.start();
}
@@ -374,6 +393,24 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
@Override
public void abort() throws IOException {
+ List<String> errors = Lists.newArrayList();
+ for (Path location : cleanUpLocations) {
+ try {
+ if (fs.exists(location)) {
+ fs.delete(location, false);
+ logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+ location.toUri().getPath(), fs.getUri());
+ }
+ } catch (IOException e) {
+ errors.add(location.toUri().getPath());
+ logger.error("Failed to delete location [{}] on file system [{}].",
+ location, fs.getUri(), e);
+ }
+ }
+ if (!errors.isEmpty()) {
+ throw new IOException(String.format("Failed to delete the following locations %s on file system [%s]" +
+ " during aborting writer", errors, fs.getUri()));
+ }
}
@Override
@@ -382,4 +419,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
codecFactory.release();
}
+
+ /**
+ * Adds passed location to the list of locations to be cleaned up in case of abort.
+ * Add locations if:
+ * <li>if no locations were added before</li>
+ * <li>if first location is a file</li>
+ *
+ * If first added location is a folder, we don't add other locations (which can be only files),
+ * since this writer was the one to create main folder where files are located,
+ * on abort we'll delete this folder with its content.
+ *
+ * If first location is a file, then we add other files, since this writer didn't create main folder
+ * and on abort we need to delete only created files but not the whole folder.
+ *
+ * @param fs file system where location is created
+ * @param location passed location
+ * @throws IOException in case of errors during check if passed location is a file
+ */
+ private void addCleanUpLocation(FileSystem fs, Path location) throws IOException {
+ if (cleanUpLocations.isEmpty() || fs.isFile(cleanUpLocations.get(0))) {
+ cleanUpLocations.add(location);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 716c56d..522c678 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,11 +21,11 @@ import java.io.IOException;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -61,6 +61,7 @@ public class ParquetWriter extends AbstractWriter {
@JsonProperty("child") PhysicalOperator child,
@JsonProperty("location") String location,
@JsonProperty("partitionColumns") List<String> partitionColumns,
+ @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
@JsonProperty("storage") StoragePluginConfig storageConfig,
@JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
@@ -69,6 +70,7 @@ public class ParquetWriter extends AbstractWriter {
Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
this.location = location;
this.partitionColumns = partitionColumns;
+ setStorageStrategy(storageStrategy);
}
public ParquetWriter(PhysicalOperator child,
@@ -109,7 +111,9 @@ public class ParquetWriter extends AbstractWriter {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new ParquetWriter(child, location, partitionColumns, formatPlugin);
+ ParquetWriter writer = new ParquetWriter(child, location, partitionColumns, formatPlugin);
+ writer.setStorageStrategy(getStorageStrategy());
+ return writer;
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 8a74b49..d65a3eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,7 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.StringOutputRecordWriter;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -37,6 +37,10 @@ import com.google.common.base.Joiner;
public class DrillTextRecordWriter extends StringOutputRecordWriter {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class);
+ private final StorageStrategy storageStrategy;
+
+ private Path cleanUpLocation;
+
private String location;
private String prefix;
@@ -52,8 +56,9 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
private StringBuilder currentRecord; // contains the current record separated by field delimiter
- public DrillTextRecordWriter(BufferAllocator allocator) {
+ public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy) {
super(allocator);
+ this.storageStrategy = storageStrategy == null ? StorageStrategy.PERSISTENT : storageStrategy;
}
@Override
@@ -79,7 +84,17 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
// open a new file for writing data with new schema
Path fileName = new Path(location, prefix + "_" + index + "." + extension);
try {
+ // drill text writer does not support partitions, so only one file can be created
+ // and thus only one location should be deleted in case of abort
+ // to ensure that our writer was the first to create output file,
+ // we create empty output file first and fail if file exists
+ cleanUpLocation = storageStrategy.createFileAndApply(fs, fileName);
+
+ // since empty output file will be overwritten (some file systems may restrict append option)
+ // we need to re-apply file permission
DataOutputStream fos = fs.create(fileName);
+ storageStrategy.applyToFile(fs, fileName);
+
stream = new PrintStream(fos);
logger.debug("Created file: {}", fileName);
} catch (IOException ex) {
@@ -160,12 +175,10 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
@Override
public void abort() throws IOException {
- cleanup();
- try {
- fs.delete(new Path(location), true);
- } catch (IOException ex) {
- logger.error("Abort failed. There could be leftover output files");
- throw ex;
+ if (cleanUpLocation != null) {
+ fs.delete(cleanUpLocation, true);
+ logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+ cleanUpLocation.toUri().getPath(), fs.getUri());
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 01e4be0..735ba2f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -203,7 +203,7 @@ drill.exec: {
scan: {
threadpool_size: 8,
decode_threadpool_size: 1
- }
+ },
udf: {
retry-attempts: 5,
directory: {
@@ -227,7 +227,11 @@ drill.exec: {
registry: ${drill.exec.udf.directory.base}"/registry",
tmp: ${drill.exec.udf.directory.base}"/tmp"
}
- }
+ },
+ # Temporary table can be created ONLY in default temporary workspace.
+ # Full workspace name should be indicated (including schema and workspace separated by dot).
+ # Workspace MUST be file-based and writable. Workspace name is case-sensitive.
+ default_temporary_workspace: "dfs.tmp"
}
drill.jdbc: {
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 93916e9..fb84088 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -84,6 +84,7 @@ public class BaseTestQuery extends ExecTest {
{
put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
put(ExecConstants.HTTP_ENABLE, "false");
+ put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, TEMP_SCHEMA);
}
};
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
index e9a38b0..acbf2e7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -177,7 +177,7 @@ public class TestDropTable extends PlanTestBase {
@Test // DRILL-4673
public void testDropTableIfExistsWhileTableExists() throws Exception {
- final String existentTableName = "test_table";
+ final String existentTableName = "test_table_exists";
test("use dfs_test.tmp");
// successful dropping of existent table
@@ -192,7 +192,7 @@ public class TestDropTable extends PlanTestBase {
@Test // DRILL-4673
public void testDropTableIfExistsWhileTableDoesNotExist() throws Exception {
- final String nonExistentTableName = "test_table";
+ final String nonExistentTableName = "test_table_not_exists";
test("use dfs_test.tmp");
// dropping of non existent table without error
@@ -200,7 +200,7 @@ public class TestDropTable extends PlanTestBase {
.sqlQuery(String.format(DROP_TABLE_IF_EXISTS, nonExistentTableName))
.unOrdered()
.baselineColumns("ok", "summary")
- .baselineValues(true, String.format("Table [%s] not found", nonExistentTableName))
+ .baselineValues(false, String.format("Table [%s] not found", nonExistentTableName))
.go();
}
@@ -216,7 +216,7 @@ public class TestDropTable extends PlanTestBase {
.sqlQuery(String.format(DROP_TABLE_IF_EXISTS, viewName))
.unOrdered()
.baselineColumns("ok", "summary")
- .baselineValues(true, String.format("Table [%s] not found", viewName))
+ .baselineValues(false, String.format("Table [%s] not found", viewName))
.go();
} finally {
test(String.format(DROP_VIEW_IF_EXISTS, viewName));
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
new file mode 100644
index 0000000..f5d45b0
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.integration.junit4.JMockit;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.util.TestUtilities;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.UUID;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(JMockit.class)
+public class TemporaryTablesAutomaticDropTest extends BaseTestQuery {
+
+ private static final String session_id = "sessionId";
+
+ @Before
+ public void init() throws Exception {
+ new MockUp<UUID>() {
+ @Mock
+ public UUID randomUUID() {
+ return UUID.nameUUIDFromBytes(session_id.getBytes());
+ }
+ };
+ updateTestCluster(1, DrillConfig.create(cloneDefaultTestConfigProperties()));
+ }
+
+ @Test
+ public void testAutomaticDropWhenClientIsClosed() throws Exception {
+ File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("client_closed",
+ getDfsTestTmpSchemaLocation());
+ updateClient("new_client");
+ assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists());
+ }
+
+ @Test
+ public void testAutomaticDropWhenDrillbitIsClosed() throws Exception {
+ File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("drillbit_closed",
+ getDfsTestTmpSchemaLocation());
+ bits[0].close();
+ assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists());
+ }
+
+ @Test
+ public void testAutomaticDropOfSeveralSessionTemporaryLocations() throws Exception {
+ File firstSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("first_location",
+ getDfsTestTmpSchemaLocation());
+ StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+ String tempDir = TestUtilities.createTempDir();
+ try {
+ TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, tempDir);
+ File secondSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("second_location", tempDir);
+ updateClient("new_client");
+ assertFalse("First session temporary location should be absent", firstSessionTemporaryLocation.exists());
+ assertFalse("Second session temporary location should be absent", secondSessionTemporaryLocation.exists());
+ } finally {
+ TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, getDfsTestTmpSchemaLocation());
+ }
+ }
+
+ private File createAndCheckSessionTemporaryLocation(String suffix, String schemaLocation) throws Exception {
+ String temporaryTableName = "temporary_table_automatic_drop_" + suffix;
+ test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+ File sessionTemporaryLocation = new File(schemaLocation,
+ UUID.nameUUIDFromBytes(session_id.getBytes()).toString());
+ assertTrue("Session temporary location should exist", sessionTemporaryLocation.exists());
+ return sessionTemporaryLocation;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
index 43d8d57..5bf55af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -143,7 +143,7 @@ public class TestBaseViewSupport extends BaseTestQuery {
.sqlQuery(String.format("DROP VIEW IF EXISTS %s", viewFullName))
.unOrdered()
.baselineColumns("ok", "summary")
- .baselineValues(true, String.format("View [%s] not found in schema [%s].", viewName, finalSchema))
+ .baselineValues(false, String.format("View [%s] not found in schema [%s].", viewName, finalSchema))
.go();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
new file mode 100644
index 0000000..93c8cad
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.sql;
+
+import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.integration.junit4.JMockit;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.exec.util.TestUtilities;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(JMockit.class)
+public class TestCTTAS extends BaseTestQuery {
+
+ private static final UUID session_id = UUID.nameUUIDFromBytes("sessionId".getBytes());
+ private static final String test_schema = "dfs_test";
+ private static final String temp2_wk = "tmp2";
+ private static final String temp2_schema = String.format("%s.%s", test_schema, temp2_wk);
+
+ private static FileSystem fs;
+ private static FsPermission expectedFolderPermission;
+ private static FsPermission expectedFilePermission;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ MockUp<UUID> uuidMockUp = mockRandomUUID(session_id);
+ updateTestCluster(1, DrillConfig.create(cloneDefaultTestConfigProperties()));
+ uuidMockUp.tearDown();
+
+ StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+ FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(test_schema).getConfig();
+ pluginConfig.workspaces.put(temp2_wk, new WorkspaceConfig(TestUtilities.createTempDir(), true, null));
+ pluginRegistry.createOrUpdate(test_schema, pluginConfig, true);
+
+ fs = FileSystem.get(new Configuration());
+ expectedFolderPermission = new FsPermission(StorageStrategy.TEMPORARY.getFolderPermission());
+ expectedFilePermission = new FsPermission(StorageStrategy.TEMPORARY.getFilePermission());
+ }
+
+ private static MockUp<UUID> mockRandomUUID(final UUID uuid) {
+ return new MockUp<UUID>() {
+ @Mock
+ public UUID randomUUID() {
+ return uuid;
+ }
+ };
+ }
+
+ @Test
+ public void testSyntax() throws Exception {
+ test("create TEMPORARY table temporary_keyword as select 1 from (values(1))");
+ test("create TEMPORARY table temporary_keyword_with_wk as select 1 from (values(1))", TEMP_SCHEMA);
+ }
+
+ @Test
+ public void testCreateTableWithDifferentStorageFormats() throws Exception {
+ List<String> storageFormats = Lists.newArrayList("parquet", "json", "csvh");
+
+ try {
+ for (String storageFormat : storageFormats) {
+ String temporaryTableName = "temp_" + storageFormat;
+ mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes()));
+ test("alter session set `store.format`='%s'", storageFormat);
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ checkPermission(temporaryTableName);
+
+ testBuilder()
+ .sqlQuery("select * from %s", temporaryTableName)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("A")
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s", temporaryTableName)
+ .unOrdered()
+ .sqlBaselineQuery("select * from %s.%s", TEMP_SCHEMA, temporaryTableName)
+ .go();
+ }
+ } finally {
+ test("alter session reset `store.format`");
+ }
+ }
+
+ @Test
+ public void testTemporaryTablesCaseInsensitivity() throws Exception {
+ String temporaryTableName = "tEmP_InSeNSiTiVe";
+ List<String> temporaryTableNames = Lists.newArrayList(
+ temporaryTableName,
+ temporaryTableName.toLowerCase(),
+ temporaryTableName.toUpperCase());
+
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ for (String tableName : temporaryTableNames) {
+ testBuilder()
+ .sqlQuery("select * from %s", tableName)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("A")
+ .go();
+ }
+ }
+
+ @Test
+ public void testPartitionByWithTemporaryTables() throws Exception {
+ String temporaryTableName = "temporary_table_with_partitions";
+ mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes()));
+ test("create TEMPORARY table %s partition by (c1) as select * from (" +
+ "select 'A' as c1 from (values(1)) union all select 'B' as c1 from (values(1))) t", temporaryTableName);
+ checkPermission(temporaryTableName);
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreationOutsideOfDefaultTemporaryWorkspace() throws Exception {
+ try {
+ String temporaryTableName = "temporary_table_outside_of_default_workspace";
+ test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", temp2_schema, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: Temporary tables are not allowed to be created outside of default temporary workspace [%s].",
+ TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateWhenTemporaryTableExistsWithoutSchema() throws Exception {
+ String temporaryTableName = "temporary_table_exists_without_schema";
+ try {
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateWhenTemporaryTableExistsCaseInsensitive() throws Exception {
+ String temporaryTableName = "temporary_table_exists_without_schema";
+ try {
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName.toUpperCase());
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", temporaryTableName.toUpperCase(), TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateWhenTemporaryTableExistsWithSchema() throws Exception {
+ String temporaryTableName = "temporary_table_exists_with_schema";
+ try {
+ test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+ test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateWhenPersistentTableExists() throws Exception {
+ String persistentTableName = "persistent_table_exists";
+ try {
+ test("create table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, persistentTableName);
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", persistentTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", persistentTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateWhenViewExists() throws Exception {
+ String viewName = "view_exists";
+ try {
+ test("create view %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, viewName);
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", viewName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", viewName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreatePersistentTableWhenTemporaryTableExists() throws Exception {
+ String temporaryTableName = "temporary_table_exists_before_persistent";
+ try {
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ test("create table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A table or view with given name [%s]" +
+ " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testCreateViewWhenTemporaryTableExists() throws Exception {
+ String temporaryTableName = "temporary_table_exists_before_view";
+ try {
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+ test("create view %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: A non-view table with given name [%s] already exists in schema [%s]",
+ temporaryTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ @Test
+ public void testTemporaryAndPersistentTablesPriority() throws Exception {
+ String name = "temporary_and_persistent_table";
+ test("use %s", temp2_schema);
+ test("create TEMPORARY table %s as select 'temporary_table' as c1 from (values(1))", name);
+ test("create table %s as select 'persistent_table' as c1 from (values(1))", name);
+
+ testBuilder()
+ .sqlQuery("select * from %s", name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("temporary_table")
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s.%s", temp2_schema, name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("persistent_table")
+ .go();
+
+ test("drop table %s", name);
+
+ testBuilder()
+ .sqlQuery("select * from %s", name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("persistent_table")
+ .go();
+ }
+
+ @Test
+ public void testTemporaryTableAndViewPriority() throws Exception {
+ String name = "temporary_table_and_view";
+ test("use %s", temp2_schema);
+ test("create TEMPORARY table %s as select 'temporary_table' as c1 from (values(1))", name);
+ test("create view %s as select 'view' as c1 from (values(1))", name);
+
+ testBuilder()
+ .sqlQuery("select * from %s", name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("temporary_table")
+ .go();
+
+ testBuilder()
+ .sqlQuery("select * from %s.%s", temp2_schema, name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("view")
+ .go();
+
+ test("drop table %s", name);
+
+ testBuilder()
+ .sqlQuery("select * from %s", name)
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues("view")
+ .go();
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testTemporaryTablesInViewDefinitions() throws Exception {
+ String temporaryTableName = "temporary_table_for_view_definition";
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+ try {
+ test("create view %s.view_with_temp_table as select * from %s", TEMP_SCHEMA, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: Temporary tables usage is disallowed. Used temporary table name: [%s]", temporaryTableName)));
+ throw e;
+ }
+ }
+
+ @Test
+ public void testManualDropWithoutSchema() throws Exception {
+ String temporaryTableName = "temporary_table_to_drop_without_schema";
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+ testBuilder()
+ .sqlQuery("drop table %s", temporaryTableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Temporary table [%s] dropped", temporaryTableName))
+ .go();
+ }
+
+ @Test
+ public void testManualDropWithSchema() throws Exception {
+ String temporaryTableName = "temporary_table_to_drop_with_schema";
+ test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+
+ testBuilder()
+ .sqlQuery("drop table %s.%s", TEMP_SCHEMA, temporaryTableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Temporary table [%s] dropped", temporaryTableName))
+ .go();
+ }
+
+ @Test
+ public void testDropTemporaryTableAsViewWithoutException() throws Exception {
+ String temporaryTableName = "temporary_table_to_drop_like_view_without_exception";
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+ testBuilder()
+ .sqlQuery("drop view if exists %s.%s", TEMP_SCHEMA, temporaryTableName)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(false, String.format("View [%s] not found in schema [%s].",
+ temporaryTableName, TEMP_SCHEMA))
+ .go();
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testDropTemporaryTableAsViewWithException() throws Exception {
+ String temporaryTableName = "temporary_table_to_drop_like_view_with_exception";
+ test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+ try {
+ test("drop view %s.%s", TEMP_SCHEMA, temporaryTableName);
+ } catch (UserRemoteException e) {
+ assertThat(e.getMessage(), containsString(String.format(
+ "VALIDATION ERROR: Unknown view [%s] in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+ throw e;
+ }
+ }
+
+ private void checkPermission(String tmpTableName) throws IOException {
+ File[] files = findTemporaryTableLocation(tmpTableName);
+ assertEquals("Only one directory should match temporary table name " + tmpTableName, 1, files.length);
+ Path tmpTablePath = new Path(files[0].toURI().getPath());
+ assertEquals("Directory permission should match",
+ expectedFolderPermission, fs.getFileStatus(tmpTablePath).getPermission());
+ RemoteIterator<LocatedFileStatus> fileIterator = fs.listFiles(tmpTablePath, false);
+ while (fileIterator.hasNext()) {
+ assertEquals("File permission should match", expectedFilePermission, fileIterator.next().getPermission());
+ }
+ }
+
+ private File[] findTemporaryTableLocation(String tableName) throws IOException {
+ File sessionTempLocation = new File(getDfsTestTmpSchemaLocation(), session_id.toString());
+ Path sessionTempLocationPath = new Path(sessionTempLocation.toURI().getPath());
+ assertTrue("Session temporary location must exist", fs.exists(sessionTempLocationPath));
+ assertEquals("Session temporary location permission should match",
+ expectedFolderPermission, fs.getFileStatus(sessionTempLocationPath).getPermission());
+ final String tableUUID = UUID.nameUUIDFromBytes(tableName.getBytes()).toString();
+ return sessionTempLocation.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File path) {
+ return path.isDirectory() && path.getName().equals(tableUUID);
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
new file mode 100644
index 0000000..6a377ec
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class StorageStrategyTest {
+
+ private static final Configuration configuration = new Configuration();
+ private static final FsPermission full_permission = new FsPermission("777");
+ private static final StorageStrategy persistent_strategy = new StorageStrategy("775", "644", false);
+ private static final StorageStrategy temporary_strategy = new StorageStrategy("700", "600", true);
+ private FileSystem fs;
+
+ @Before
+ public void setup() throws Exception {
+ initFileSystem();
+ }
+
+ @Test
+ public void testPermissionAndDeleteOnExitFalseForFileWithParent() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 2, true);
+ Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+ Path createdParentPath = persistent_strategy.createFileAndApply(fs, file);
+
+ assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+ checkPathAndPermission(initialPath, file, true, 2, persistent_strategy);
+ checkDeleteOnExit(firstCreatedParentPath, true);
+ }
+
+ @Test
+ public void testPermissionAndDeleteOnExitTrueForFileWithParent() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 2, true);
+ Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+ Path createdParentPath = temporary_strategy.createFileAndApply(fs, file);
+
+ assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+ checkPathAndPermission(initialPath, file, true, 2, temporary_strategy);
+ checkDeleteOnExit(firstCreatedParentPath, false);
+ }
+
+ @Test
+ public void testPermissionAndDeleteOnExitFalseForFileOnly() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 0, true);
+
+ Path createdFile = persistent_strategy.createFileAndApply(fs, file);
+
+ assertEquals("Path should match", file, createdFile);
+ checkPathAndPermission(initialPath, file, true, 0, persistent_strategy);
+ checkDeleteOnExit(file, true);
+ }
+
+ @Test
+ public void testPermissionAndDeleteOnExitTrueForFileOnly() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 0, true);
+
+ Path createdFile = temporary_strategy.createFileAndApply(fs, file);
+
+ assertEquals("Path should match", file, createdFile);
+ checkPathAndPermission(initialPath, file, true, 0, temporary_strategy);
+ checkDeleteOnExit(file, false);
+ }
+
+ @Test(expected = IOException.class)
+ public void testFailureOnExistentFile() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 0, true);
+ fs.createNewFile(file);
+ assertTrue("File should exist", fs.exists(file));
+ try {
+ persistent_strategy.createFileAndApply(fs, file);
+ } catch (IOException e) {
+ assertEquals("Error message should match", String.format("File [%s] already exists on file system [%s].",
+ file.toUri().getPath(), fs.getUri()), e.getMessage());
+ throw e;
+ }
+ }
+
+ @Test
+ public void testCreatePathAndDeleteOnExitFalse() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path resultPath = addNLevelsAndFile(initialPath, 2, false);
+ Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+ Path createdParentPath = persistent_strategy.createPathAndApply(fs, resultPath);
+
+ assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+ checkPathAndPermission(initialPath, resultPath, false, 2, persistent_strategy);
+ checkDeleteOnExit(firstCreatedParentPath, true);
+ }
+
+ @Test
+ public void testCreatePathAndDeleteOnExitTrue() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path resultPath = addNLevelsAndFile(initialPath, 2, false);
+ Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+ Path createdParentPath = temporary_strategy.createPathAndApply(fs, resultPath);
+
+ assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+ checkPathAndPermission(initialPath, resultPath, false, 2, temporary_strategy);
+ checkDeleteOnExit(firstCreatedParentPath, false);
+ }
+
+ @Test
+ public void testCreateNoPath() throws Exception {
+ Path path = prepareStorageDirectory();
+
+ Path createdParentPath = temporary_strategy.createPathAndApply(fs, path);
+
+ assertNull("Path should be null", createdParentPath);
+ assertEquals("Permission should match", full_permission, fs.getFileStatus(path).getPermission());
+ }
+
+ @Test
+ public void testStrategyForExistingFile() throws Exception {
+ Path initialPath = prepareStorageDirectory();
+ Path file = addNLevelsAndFile(initialPath, 0, true);
+ fs.createNewFile(file);
+ fs.setPermission(file, full_permission);
+
+ assertTrue("File should exist", fs.exists(file));
+ assertEquals("Permission should match", full_permission, fs.getFileStatus(file).getPermission());
+
+ temporary_strategy.applyToFile(fs, file);
+
+ assertEquals("Permission should match", new FsPermission(temporary_strategy.getFilePermission()),
+ fs.getFileStatus(file).getPermission());
+ checkDeleteOnExit(file, false);
+ }
+
+ private Path prepareStorageDirectory() throws IOException {
+ File storageDirectory = Files.createTempDir();
+ storageDirectory.deleteOnExit();
+ Path path = new Path(storageDirectory.toURI().getPath());
+ fs.setPermission(path, full_permission);
+ return path;
+ }
+
+ private void initFileSystem() throws IOException {
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (Exception e) {
+ // do nothing
+ }
+ }
+ fs = FileSystem.get(configuration);
+ }
+
+ private Path addNLevelsAndFile(Path initialPath, int levels, boolean addFile) {
+ Path resultPath = initialPath;
+ for (int i = 1; i <= levels; i++) {
+ resultPath = new Path(resultPath, "level" + i);
+ }
+ if (addFile) {
+ resultPath = new Path(resultPath, "test_file.txt");
+ }
+ return resultPath;
+ }
+
+ private void checkPathAndPermission(Path initialPath,
+ Path resultPath,
+ boolean isFile,
+ int levels,
+ StorageStrategy storageStrategy) throws IOException {
+
+ assertEquals("Path type should match", isFile, fs.isFile(resultPath));
+ assertEquals("Permission should match", full_permission, fs.getFileStatus(initialPath).getPermission());
+
+ if (isFile) {
+ assertEquals("Permission should match", new FsPermission(storageStrategy.getFilePermission()),
+ fs.getFileStatus(resultPath).getPermission());
+ }
+ Path startingPath = initialPath;
+ FsPermission folderPermission = new FsPermission(storageStrategy.getFolderPermission());
+ for (int i = 1; i <= levels; i++) {
+ startingPath = new Path(startingPath, "level" + i);
+ assertEquals("Permission should match", folderPermission, fs.getFileStatus(startingPath).getPermission());
+ }
+ }
+
+ private void checkDeleteOnExit(Path path, boolean isPresent) throws IOException {
+ assertTrue("Path should be present", fs.exists(path));
+ // close and open file system to check for path presence
+ initFileSystem();
+ assertEquals("Path existence flag should match", isPresent, fs.exists(path));
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index 7b977e2..35ca26b 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -9,7 +9,7 @@
writable: false
},
"tmp" : {
- location: "/tmp/drilltest",
+ location: "/tmp",
writable: true
}
},
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index fbacd23..cd97ab7 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -176,6 +176,13 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
}
/**
+ * Closes all resources connected with current session.
+ * By default has no implementation.
+ */
+ public void closeSession() {
+ }
+
+ /**
* Connection consumer wants to close connection. Initiate connection close
* and complete. This is a blocking call that ensures that the connection is
* closed before returning. As part of this call, the channel close handler
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index c360e51..cdb9c07 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -164,7 +164,11 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
final ChannelClosedException ex = future.cause() != null ? new ChannelClosedException(msg, future.cause()) : new ChannelClosedException(msg);
- clientConnection.channelClosed(ex);
+ try {
+ clientConnection.closeSession();
+ } finally {
+ clientConnection.channelClosed(ex);
+ }
}
}
[5/5] drill git commit: DRILL-5164: Equi-join query results in
CompileException when inputs have large number of columns.
Posted by jn...@apache.org.
DRILL-5164: Equi-join query results in CompileException when inputs have large number of columns.
close apache/drill#711
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2af709f4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2af709f4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2af709f4
Branch: refs/heads/master
Commit: 2af709f43d01f341b2a52c6473ea49d6761fdc61
Parents: 8217697
Author: Serhii-Harnyk <se...@gmail.com>
Authored: Tue Dec 27 16:20:37 2016 +0000
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Jan 23 17:08:58 2017 -0800
----------------------------------------------------------------------
.../exec/physical/impl/join/HashJoinBatch.java | 4 +-
.../exec/physical/impl/join/MergeJoinBatch.java | 2 +
.../physical/impl/join/NestedLoopJoinBatch.java | 4 +-
.../exec/compile/TestLargeFileCompilation.java | 94 +++++++++++++++++---
4 files changed, 87 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 23741b0..f1f81fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -439,7 +439,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
.arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
.arg(outIndex)
.arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
-
+ g.rotateBlock();
fieldId++;
}
}
@@ -475,7 +475,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV));
-
+ g.rotateBlock();
fieldId++;
outputFieldId++;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index a9bb479..c351517 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -345,6 +345,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
.arg(copyLeftMapping.getValueReadIndex())
.arg(copyLeftMapping.getValueWriteIndex())
.arg(vvIn));
+ cg.rotateBlock();
++vectorId;
}
}
@@ -373,6 +374,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
.arg(copyRightMappping.getValueReadIndex())
.arg(copyRightMappping.getValueWriteIndex())
.arg(vvIn));
+ cg.rotateBlock();
++vectorId;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 2e92c8d..bdd9f0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -247,7 +247,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(fieldType, false, outputFieldId));
nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV));
-
+ nLJClassGenerator.rotateBlock();
fieldId++;
outputFieldId++;
}
@@ -270,7 +270,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
.arg(recordIndexWithinBatch)
.arg(outIndex)
.arg(inVV.component(batchIndex)));
-
+ nLJClassGenerator.rotateBlock();
fieldId++;
outputFieldId++;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 35bd4c9..8416d73 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -39,27 +39,33 @@ public class TestLargeFileCompilation extends BaseTestQuery {
private static final String LARGE_QUERY_SELECT_LIST;
+ private static final String QUERY_WITH_JOIN;
+
+ private static final String LARGE_TABLE_WRITER;
+
private static final int ITERATION_COUNT = Integer.valueOf(System.getProperty("TestLargeFileCompilation.iteration", "1"));
- private static final int NUM_PROJECT_COULMNS = 2000;
+ private static final int NUM_PROJECT_COLUMNS = 2000;
+
+ private static final int NUM_ORDERBY_COLUMNS = 500;
- private static final int NUM_ORDERBY_COULMNS = 500;
+ private static final int NUM_GROUPBY_COLUMNS = 225;
- private static final int NUM_GROUPBY_COULMNS = 225;
+ private static final int NUM_FILTER_COLUMNS = 150;
- private static final int NUM_FILTER_COULMNS = 150;
+ private static final int NUM_JOIN_TABLE_COLUMNS = 500;
static {
StringBuilder sb = new StringBuilder("select\n\t");
- for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) {
+ for (int i = 0; i < NUM_GROUPBY_COLUMNS; i++) {
sb.append("c").append(i).append(", ");
}
sb.append("full_name\nfrom (select\n\t");
- for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) {
+ for (int i = 0; i < NUM_GROUPBY_COLUMNS; i++) {
sb.append("employee_id+").append(i).append(" as c").append(i).append(", ");
}
sb.append("full_name\nfrom cp.`employee.json`)\ngroup by\n\t");
- for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) {
+ for (int i = 0; i < NUM_GROUPBY_COLUMNS; i++) {
sb.append("c").append(i).append(", ");
}
LARGE_QUERY_GROUP_BY = sb.append("full_name").toString();
@@ -67,7 +73,7 @@ public class TestLargeFileCompilation extends BaseTestQuery {
static {
StringBuilder sb = new StringBuilder("select\n\t");
- for (int i = 0; i < NUM_PROJECT_COULMNS; i++) {
+ for (int i = 0; i < NUM_PROJECT_COLUMNS; i++) {
sb.append("employee_id+").append(i).append(" as col").append(i).append(", ");
}
sb.append("full_name\nfrom cp.`employee.json`\n\n\t");
@@ -76,11 +82,11 @@ public class TestLargeFileCompilation extends BaseTestQuery {
static {
StringBuilder sb = new StringBuilder("select\n\t");
- for (int i = 0; i < NUM_PROJECT_COULMNS; i++) {
+ for (int i = 0; i < NUM_PROJECT_COLUMNS; i++) {
sb.append("employee_id+").append(i).append(" as col").append(i).append(", ");
}
sb.append("full_name\nfrom cp.`employee.json`\norder by\n\t");
- for (int i = 0; i < NUM_ORDERBY_COULMNS; i++) {
+ for (int i = 0; i < NUM_ORDERBY_COLUMNS; i++) {
sb.append(" col").append(i).append(", ");
}
LARGE_QUERY_ORDER_BY = sb.append("full_name").toString();
@@ -91,18 +97,24 @@ public class TestLargeFileCompilation extends BaseTestQuery {
StringBuilder sb = new StringBuilder("select *\n")
.append("from cp.`employee.json`\n")
.append("where");
- for (int i = 0; i < NUM_FILTER_COULMNS; i++) {
+ for (int i = 0; i < NUM_FILTER_COLUMNS; i++) {
sb.append(" employee_id+").append(i).append(" < employee_id ").append(i%2==0?"OR":"AND");
}
LARGE_QUERY_FILTER = sb.append(" true") .toString();
}
static {
+ LARGE_QUERY_WRITER = createTableWithColsCount(NUM_PROJECT_COLUMNS);
+ LARGE_TABLE_WRITER = createTableWithColsCount(NUM_JOIN_TABLE_COLUMNS);
+ QUERY_WITH_JOIN = "select * from %1$s t1, %1$s t2 where t1.col1 = t2.col1";
+ }
+
+ private static String createTableWithColsCount(int columnsCount) {
StringBuilder sb = new StringBuilder("create table %s as (select \n");
- for (int i = 0; i < NUM_PROJECT_COULMNS; i++) {
+ for (int i = 0; i < columnsCount; i++) {
sb.append("employee_id+").append(i).append(" as col").append(i).append(", ");
}
- LARGE_QUERY_WRITER = sb.append("full_name\nfrom cp.`employee.json` limit 1)").toString();
+ return sb.append("full_name\nfrom cp.`employee.json` limit 1)").toString();
}
@Test
@@ -150,4 +162,60 @@ public class TestLargeFileCompilation extends BaseTestQuery {
testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
testNoResult(ITERATION_COUNT, LARGE_QUERY_SELECT_LIST);
}
+
+ @Test
+ public void testHashJoin() throws Exception {
+ String tableName = "wide_table_hash_join";
+ try {
+ testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ testNoResult("alter session set `planner.enable_mergejoin` = false");
+ testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
+ testNoResult("use dfs_test.tmp");
+ testNoResult(LARGE_TABLE_WRITER, tableName);
+ testNoResult(QUERY_WITH_JOIN, tableName);
+ } finally {
+ testNoResult("alter session reset `planner.enable_mergejoin`");
+ testNoResult("alter session reset `planner.enable_nestedloopjoin`");
+ testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ testNoResult("drop table if exists %s", tableName);
+ }
+ }
+
+ @Test
+ public void testMergeJoin() throws Exception {
+ String tableName = "wide_table_merge_join";
+ try {
+ testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ testNoResult("alter session set `planner.enable_hashjoin` = false");
+ testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
+ testNoResult("use dfs_test.tmp");
+ testNoResult(LARGE_TABLE_WRITER, tableName);
+ testNoResult(QUERY_WITH_JOIN, tableName);
+ } finally {
+ testNoResult("alter session reset `planner.enable_hashjoin`");
+ testNoResult("alter session reset `planner.enable_nestedloopjoin`");
+ testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ testNoResult("drop table if exists %s", tableName);
+ }
+ }
+
+ @Test
+ public void testNestedLoopJoin() throws Exception {
+ String tableName = "wide_table_loop_join";
+ try {
+ testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ testNoResult("alter session set `planner.enable_nljoin_for_scalar_only` = false");
+ testNoResult("alter session set `planner.enable_hashjoin` = false");
+ testNoResult("alter session set `planner.enable_mergejoin` = false");
+ testNoResult("use dfs_test.tmp");
+ testNoResult(LARGE_TABLE_WRITER, tableName);
+ testNoResult(QUERY_WITH_JOIN, tableName);
+ } finally {
+ testNoResult("alter session reset `planner.enable_nljoin_for_scalar_only`");
+ testNoResult("alter session reset `planner.enable_hashjoin`");
+ testNoResult("alter session reset `planner.enable_mergejoin`");
+ testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ testNoResult("drop table if exists %s", tableName);
+ }
+ }
}
[2/5] drill git commit: DRILL-4956: Temporary tables support
Posted by jn...@apache.org.
DRILL-4956: Temporary tables support
close apache/drill#666
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bb29f19f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bb29f19f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bb29f19f
Branch: refs/heads/master
Commit: bb29f19ff8807fd07cdaa9e7110c1a003b3da15b
Parents: 8a4d7a9
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Thu Nov 3 16:55:38 2016 +0000
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Jan 23 17:08:06 2017 -0800
----------------------------------------------------------------------
.../src/resources/drill-override-example.conf | 8 +-
.../src/main/codegen/includes/parserImpls.ftl | 9 +-
.../org/apache/drill/exec/ExecConstants.java | 7 +-
.../exec/physical/base/AbstractWriter.java | 17 +-
.../exec/physical/impl/WriterRecordBatch.java | 22 +-
.../logical/FileSystemCreateTableEntry.java | 18 +-
.../drill/exec/planner/sql/DrillSqlWorker.java | 9 +-
.../drill/exec/planner/sql/SchemaUtilites.java | 42 +-
.../drill/exec/planner/sql/SqlConverter.java | 141 +++++--
.../sql/handlers/CreateTableHandler.java | 111 ++++-
.../planner/sql/handlers/DropTableHandler.java | 65 ++-
.../planner/sql/handlers/SqlHandlerUtil.java | 49 ++-
.../exec/planner/sql/handlers/ViewHandler.java | 63 +--
.../sql/parser/CompoundIdentifierConverter.java | 4 +-
.../exec/planner/sql/parser/SqlCreateTable.java | 28 +-
.../apache/drill/exec/rpc/user/UserServer.java | 13 +-
.../apache/drill/exec/rpc/user/UserSession.java | 170 +++++++-
.../org/apache/drill/exec/server/Drillbit.java | 23 +-
.../apache/drill/exec/store/AbstractSchema.java | 21 +-
.../drill/exec/store/SchemaTreeProvider.java | 32 +-
.../drill/exec/store/StorageStrategy.java | 194 +++++++++
.../drill/exec/store/SubSchemaWrapper.java | 6 +-
.../drill/exec/store/dfs/FileSelection.java | 5 +-
.../exec/store/dfs/FileSystemSchemaFactory.java | 10 +-
.../exec/store/dfs/WorkspaceSchemaFactory.java | 10 +-
.../drill/exec/store/dfs/easy/EasyWriter.java | 10 +-
.../exec/store/easy/json/JSONFormatPlugin.java | 4 +-
.../exec/store/easy/json/JsonRecordWriter.java | 24 +-
.../exec/store/easy/text/TextFormatPlugin.java | 4 +-
.../exec/store/parquet/ParquetRecordWriter.java | 64 ++-
.../drill/exec/store/parquet/ParquetWriter.java | 10 +-
.../exec/store/text/DrillTextRecordWriter.java | 31 +-
.../src/main/resources/drill-module.conf | 8 +-
.../java/org/apache/drill/BaseTestQuery.java | 3 +-
.../java/org/apache/drill/TestDropTable.java | 10 +-
.../user/TemporaryTablesAutomaticDropTest.java | 95 +++++
.../drill/exec/sql/TestBaseViewSupport.java | 4 +-
.../org/apache/drill/exec/sql/TestCTTAS.java | 422 +++++++++++++++++++
.../drill/exec/store/StorageStrategyTest.java | 222 ++++++++++
.../resources/bootstrap-storage-plugins.json | 2 +-
.../apache/drill/exec/rpc/RemoteConnection.java | 9 +-
.../java/org/apache/drill/exec/rpc/RpcBus.java | 8 +-
42 files changed, 1797 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index f9d27b3..3baac5e 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -184,7 +184,13 @@ drill.exec: {
# Set this property if custom absolute root should be used for remote directories
root: "/app/drill"
}
- }
+ },
+ # Settings for Temporary Tables.
+ # See https://gist.github.com/arina-ielchiieva/50158175867a18eee964b5ba36455fbf#file-temporarytablessupport-md.
+ # Temporary table can be created ONLY in default temporary workspace.
+ # Full workspace name should be indicated (including schema and workspace separated by dot).
+ # Workspace MUST be file-based and writable. Workspace name is case-sensitive.
+ default_temporary_workspace: "dfs.tmp"
}
# Below SSL parameters need to be set for custom transport layer settings.
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index 0017446..d9ceed9 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -214,8 +214,8 @@ SqlNode SqlDropView() :
}
/**
- * Parses a CTAS statement.
- * CREATE TABLE tblname [ (field1, field2, ...) ] AS select_statement.
+ * Parses a CTAS or CTTAS statement.
+ * CREATE [TEMPORARY] TABLE tblname [ (field1, field2, ...) ] AS select_statement.
*/
SqlNode SqlCreateTable() :
{
@@ -224,12 +224,14 @@ SqlNode SqlCreateTable() :
SqlNodeList fieldList;
SqlNodeList partitionFieldList;
SqlNode query;
+ boolean isTemporary = false;
}
{
{
partitionFieldList = SqlNodeList.EMPTY;
}
<CREATE> { pos = getPos(); }
+ ( <TEMPORARY> { isTemporary = true; } )?
<TABLE>
tblName = CompoundIdentifier()
fieldList = ParseOptionalFieldList("Table")
@@ -239,7 +241,8 @@ SqlNode SqlCreateTable() :
<AS>
query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
- return new SqlCreateTable(pos, tblName, fieldList, partitionFieldList, query);
+ return new SqlCreateTable(pos,tblName, fieldList, partitionFieldList, query,
+ SqlLiteral.createBoolean(isTemporary, getPos()));
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 740eb4b..e8cc75c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -128,6 +128,11 @@ public interface ExecConstants {
*/
String DRILL_TMP_DIR = "drill.tmp-dir";
+ /**
+ * Temporary tables can be created ONLY in default temporary workspace.
+ */
+ String DEFAULT_TEMPORARY_WORKSPACE = "drill.exec.default_temporary_workspace";
+
String OUTPUT_FORMAT_OPTION = "store.format";
OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
String PARQUET_BLOCK_SIZE = "store.parquet.block-size";
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
index af23d5f..6ba570b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,7 +17,12 @@
*/
package org.apache.drill.exec.physical.base;
-public abstract class AbstractWriter extends AbstractSingle implements Writer{
+import org.apache.drill.exec.store.StorageStrategy;
+
+public abstract class AbstractWriter extends AbstractSingle implements Writer {
+
+ /** Storage strategy is used during table folder and files creation*/
+ private StorageStrategy storageStrategy;
public AbstractWriter(PhysicalOperator child) {
super(child);
@@ -27,4 +32,12 @@ public abstract class AbstractWriter extends AbstractSingle implements Writer{
public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
return physicalVisitor.visitWriter(this, value);
}
+
+ public void setStorageStrategy(StorageStrategy storageStrategy) {
+ this.storageStrategy = storageStrategy;
+ }
+
+ public StorageStrategy getStorageStrategy() {
+ return storageStrategy;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index e6c946c..939832b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -174,13 +174,25 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
schema = container.getSchema();
}
+ /** Clean up needs to be performed before closing writer. Partially written data will be removed. */
private void closeWriter() {
- if (recordWriter != null) {
+ if (recordWriter == null) {
+ return;
+ }
+
+ try {
+ recordWriter.cleanup();
+ } catch(IOException ex) {
+ context.fail(ex);
+ } finally {
try {
- recordWriter.cleanup();
+ if (!processed) {
+ recordWriter.abort();
+ }
+ } catch (IOException e) {
+ logger.error("Abort failed. There could be leftover output files.", e);
+ } finally {
recordWriter = null;
- } catch(IOException ex) {
- context.fail(ex);
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
index 90eb05c..23ea23f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,9 +22,10 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.physical.base.Writer;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
@@ -34,7 +35,6 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.exec.store.ischema.Records;
/**
* Implements <code>CreateTableEntry</code> interface to create new tables in FileSystem storage.
@@ -47,28 +47,33 @@ public class FileSystemCreateTableEntry implements CreateTableEntry {
private FormatPlugin formatPlugin;
private String location;
private final List<String> partitionColumns;
+ private final StorageStrategy storageStrategy;
@JsonCreator
public FileSystemCreateTableEntry(@JsonProperty("storageConfig") FileSystemConfig storageConfig,
@JsonProperty("formatConfig") FormatPluginConfig formatConfig,
@JsonProperty("location") String location,
@JsonProperty("partitionColumn") List<String> partitionColumns,
+ @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
@JacksonInject StoragePluginRegistry engineRegistry)
throws ExecutionSetupException {
this.storageConfig = storageConfig;
this.formatPlugin = engineRegistry.getFormatPlugin(storageConfig, formatConfig);
this.location = location;
this.partitionColumns = partitionColumns;
+ this.storageStrategy = storageStrategy;
}
public FileSystemCreateTableEntry(FileSystemConfig storageConfig,
FormatPlugin formatPlugin,
String location,
- List<String> partitionColumns) {
+ List<String> partitionColumns,
+ StorageStrategy storageStrategy) {
this.storageConfig = storageConfig;
this.formatPlugin = formatPlugin;
this.location = location;
this.partitionColumns = partitionColumns;
+ this.storageStrategy = storageStrategy;
}
@JsonProperty("storageConfig")
@@ -89,11 +94,14 @@ public class FileSystemCreateTableEntry implements CreateTableEntry {
formatPlugin.getName())).build(logger);
}
- return formatPlugin.getWriter(child, location, partitionColumns);
+ AbstractWriter writer = formatPlugin.getWriter(child, location, partitionColumns);
+ writer.setStorageStrategy(storageStrategy);
+ return writer;
}
@Override
public List<String> getPartitionColumns() {
return partitionColumns;
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 76529d4..0ad3944 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -58,12 +58,7 @@ public class DrillSqlWorker {
public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan)
throws ForemanSetupException {
- final SqlConverter parser = new SqlConverter(
- context.getPlannerSettings(),
- context.getNewDefaultSchema(),
- context.getDrillOperatorTable(),
- (UdfUtilities) context,
- context.getFunctionRegistry());
+ final SqlConverter parser = new SqlConverter(context);
injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class);
final SqlNode sqlNode = parser.parse(sql);
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
index 085f808..20c92c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,8 +22,12 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
import java.util.Collections;
import java.util.List;
@@ -116,6 +120,11 @@ public class SchemaUtilites {
return SCHEMA_PATH_JOINER.join(getSchemaPathAsList(schema));
}
+ /** Utility method to get the schema path for given list of schema path. */
+ public static String getSchemaPath(List<String> schemaPath) {
+ return SCHEMA_PATH_JOINER.join(schemaPath);
+ }
+
/** Utility method to get the schema path as list for given schema instance. */
public static List<String> getSchemaPathAsList(SchemaPlus schema) {
if (isRootSchema(schema)) {
@@ -177,4 +186,35 @@ public class SchemaUtilites {
return drillSchema;
}
+
+ /**
+ * Looks in schema tree for default temporary workspace instance.
+ * Makes sure that temporary workspace is mutable and file-based
+ * (instance of {@link WorkspaceSchemaFactory.WorkspaceSchema}).
+ *
+ * @param defaultSchema default schema
+ * @param config drill config
+ * @return default temporary workspace
+ */
+ public static AbstractSchema getTemporaryWorkspace(SchemaPlus defaultSchema, DrillConfig config) {
+ List<String> temporarySchemaPath = Lists.newArrayList(config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE));
+ AbstractSchema temporarySchema = resolveToMutableDrillSchema(defaultSchema, temporarySchemaPath);
+ if (!(temporarySchema instanceof WorkspaceSchemaFactory.WorkspaceSchema)) {
+ DrillRuntimeException.format("Temporary workspace [%s] must be file-based, instance of " +
+ "WorkspaceSchemaFactory.WorkspaceSchema", temporarySchemaPath);
+ }
+ return temporarySchema;
+ }
+
+ /**
+ * Checks that passed schema path is the same as temporary workspace path.
+ * Check is case-sensitive.
+ *
+ * @param schemaPath schema path
+ * @param config drill config
+ * @return true is schema path corresponds to temporary workspace, false otherwise
+ */
+ public static boolean isTemporaryWorkspace(String schemaPath, DrillConfig config) {
+ return schemaPath.equals(config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE));
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 0c3c6a0..28196c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,10 +21,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.CalciteSchemaImpl;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.ConventionTraitDef;
@@ -33,6 +35,7 @@ import org.apache.calcite.plan.RelOptCostFactory;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
@@ -41,17 +44,14 @@ import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserImplFactory;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.sql.validate.AggregatingSelectScope;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
import org.apache.calcite.sql.validate.SqlValidatorException;
@@ -61,18 +61,21 @@ import org.apache.calcite.sql2rel.RelDecorrelator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.exception.FunctionNotFoundException;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.logical.DrillConstExecutor;
import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.sql.parser.impl.DrillParserWithCompoundIdConverter;
import com.google.common.base.Joiner;
+import org.apache.drill.exec.rpc.user.UserSession;
/**
* Class responsible for managing parsing, validation and toRel conversion for sql statements.
@@ -86,7 +89,7 @@ public class SqlConverter {
private final SqlParser.Config parserConfig;
// Allow the default config to be modified using immutable configs
private SqlToRelConverter.Config sqlToRelConverterConfig;
- private final CalciteCatalogReader catalog;
+ private final DrillCalciteCatalogReader catalog;
private final PlannerSettings settings;
private final SchemaPlus rootSchema;
private final SchemaPlus defaultSchema;
@@ -96,35 +99,42 @@ public class SqlConverter {
private final boolean isInnerQuery;
private final UdfUtilities util;
private final FunctionImplementationRegistry functions;
+ private final String temporarySchema;
+ private final UserSession session;
+ private final DrillConfig drillConfig;
private String sql;
private VolcanoPlanner planner;
- public SqlConverter(PlannerSettings settings, SchemaPlus defaultSchema,
- final SqlOperatorTable operatorTable, UdfUtilities util, FunctionImplementationRegistry functions) {
- this.settings = settings;
- this.util = util;
- this.functions = functions;
+ public SqlConverter(QueryContext context) {
+ this.settings = context.getPlannerSettings();
+ this.util = (UdfUtilities) context;
+ this.functions = context.getFunctionRegistry();
this.parserConfig = new ParserConfig();
this.sqlToRelConverterConfig = new SqlToRelConverterConfig();
this.isInnerQuery = false;
this.typeFactory = new JavaTypeFactoryImpl(DRILL_TYPE_SYSTEM);
- this.defaultSchema = defaultSchema;
+ this.defaultSchema = context.getNewDefaultSchema();
this.rootSchema = rootSchema(defaultSchema);
- this.catalog = new CalciteCatalogReader(
+ this.temporarySchema = context.getConfig().getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE);
+ this.session = context.getSession();
+ this.drillConfig = context.getConfig();
+ this.catalog = new DrillCalciteCatalogReader(
CalciteSchemaImpl.from(rootSchema),
parserConfig.caseSensitive(),
CalciteSchemaImpl.from(defaultSchema).path(null),
- typeFactory);
- this.opTab = new ChainedSqlOperatorTable(Arrays.asList(operatorTable, catalog));
+ typeFactory,
+ drillConfig,
+ session);
+ this.opTab = new ChainedSqlOperatorTable(Arrays.asList(context.getDrillOperatorTable(), catalog));
this.costFactory = (settings.useDefaultCosting()) ? null : new DrillCostBase.DrillCostFactory();
this.validator = new DrillValidator(opTab, catalog, typeFactory, SqlConformance.DEFAULT);
validator.setIdentifierExpansion(true);
}
private SqlConverter(SqlConverter parent, SchemaPlus defaultSchema, SchemaPlus rootSchema,
- CalciteCatalogReader catalog) {
+ DrillCalciteCatalogReader catalog) {
this.parserConfig = parent.parserConfig;
this.sqlToRelConverterConfig = parent.sqlToRelConverterConfig;
this.defaultSchema = defaultSchema;
@@ -139,6 +149,9 @@ public class SqlConverter {
this.opTab = parent.opTab;
this.planner = parent.planner;
this.validator = new DrillValidator(opTab, catalog, typeFactory, SqlConformance.DEFAULT);
+ this.temporarySchema = parent.temporarySchema;
+ this.session = parent.session;
+ this.drillConfig = parent.drillConfig;
validator.setIdentifierExpansion(true);
}
@@ -203,6 +216,11 @@ public class SqlConverter {
return defaultSchema;
}
+ /** Disallow temporary tables presence in sql statement (ex: in view definitions) */
+ public void disallowTemporaryTables() {
+ catalog.disallowTemporaryTables();
+ }
+
private class DrillValidator extends SqlValidatorImpl {
private final Set<SqlValidatorScope> identitySet = Sets.newIdentityHashSet();
@@ -272,26 +290,27 @@ public class SqlConverter {
public Expander() {
}
- public RelNode expandView(
- RelDataType rowType,
- String queryString,
- List<String> schemaPath) {
- SqlConverter parser = new SqlConverter(SqlConverter.this, defaultSchema, rootSchema,
- catalog.withSchemaPath(schemaPath));
+ public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) {
+ final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader(
+ CalciteSchemaImpl.from(rootSchema),
+ parserConfig.caseSensitive(),
+ schemaPath,
+ typeFactory,
+ drillConfig,
+ session);
+ final SqlConverter parser = new SqlConverter(SqlConverter.this, defaultSchema, rootSchema, catalogReader);
return expandView(queryString, parser);
}
@Override
- public RelNode expandView(
- RelDataType rowType,
- String queryString,
- SchemaPlus rootSchema, // new root schema
- List<String> schemaPath) {
- final CalciteCatalogReader catalogReader = new CalciteCatalogReader(
- CalciteSchemaImpl.from(rootSchema),
+ public RelNode expandView(RelDataType rowType, String queryString, SchemaPlus rootSchema, List<String> schemaPath) {
+ final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader(
+ CalciteSchemaImpl.from(rootSchema), // new root schema
parserConfig.caseSensitive(),
schemaPath,
- typeFactory);
+ typeFactory,
+ drillConfig,
+ session);
SchemaPlus schema = rootSchema;
for (String s : schemaPath) {
SchemaPlus newSchema = schema.getSubSchema(s);
@@ -447,4 +466,66 @@ public class SqlConverter {
return node;
}
}
+
+ /**
+ * Extension of {@link CalciteCatalogReader} to add ability to check for temporary tables first
+ * if schema is not indicated near table name during query parsing
+ * or indicated workspace is default temporary workspace.
+ */
+ private class DrillCalciteCatalogReader extends CalciteCatalogReader {
+
+ private final DrillConfig drillConfig;
+ private final UserSession session;
+ private boolean allowTemporaryTables;
+
+ DrillCalciteCatalogReader(CalciteSchema rootSchema,
+ boolean caseSensitive,
+ List<String> defaultSchema,
+ JavaTypeFactory typeFactory,
+ DrillConfig drillConfig,
+ UserSession session) {
+ super(rootSchema, caseSensitive, defaultSchema, typeFactory);
+ this.drillConfig = drillConfig;
+ this.session = session;
+ this.allowTemporaryTables = true;
+ }
+
+ /** Disallow temporary tables presence in sql statement (ex: in view definitions) */
+ public void disallowTemporaryTables() {
+ this.allowTemporaryTables = false;
+ }
+
+ /**
+ * If schema is not indicated (only one element in the list) or schema is default temporary workspace,
+ * we need to check among session temporary tables first in default temporary workspace.
+ * If temporary table is found and temporary tables usage is allowed, its table instance will be returned,
+ * otherwise search will be conducted in original workspace.
+ *
+ * @param names list of schema and table names, table name is always the last element
+ * @return table instance, null otherwise
+ * @throws UserException if temporary tables usage is disallowed
+ */
+ @Override
+ public RelOptTableImpl getTable(final List<String> names) {
+ RelOptTableImpl temporaryTable = null;
+ String schemaPath = SchemaUtilites.getSchemaPath(names.subList(0, names.size() - 1));
+ if (names.size() == 1 || SchemaUtilites.isTemporaryWorkspace(schemaPath, drillConfig)) {
+ String temporaryTableName = session.resolveTemporaryTableName(names.get(names.size() - 1));
+ if (temporaryTableName != null) {
+ List<String> temporaryNames = Lists.newArrayList(temporarySchema, temporaryTableName);
+ temporaryTable = super.getTable(temporaryNames);
+ }
+ }
+ if (temporaryTable != null) {
+ if (allowTemporaryTables) {
+ return temporaryTable;
+ }
+ throw UserException
+ .validationError()
+ .message("Temporary tables usage is disallowed. Used temporary table name: %s.", names)
+ .build(logger);
+ }
+ return super.getTable(names);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index b6ffde6..12c72c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -31,13 +31,17 @@ import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScreenRel;
import org.apache.drill.exec.planner.logical.DrillWriterRel;
@@ -67,43 +71,54 @@ public class CreateTableHandler extends DefaultSqlHandler {
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
SqlCreateTable sqlCreateTable = unwrap(sqlNode, SqlCreateTable.class);
- final String newTblName = sqlCreateTable.getName();
+ String originalTableName = sqlCreateTable.getName();
final ConvertedRelNode convertedRelNode = validateAndConvert(sqlCreateTable.getQuery());
final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
final RelNode queryRelNode = convertedRelNode.getConvertedNode();
-
final RelNode newTblRelNode =
SqlHandlerUtil.resolveNewTableRel(false, sqlCreateTable.getFieldNames(), validatedRowType, queryRelNode);
- final AbstractSchema drillSchema =
- SchemaUtilites.resolveToMutableDrillSchema(config.getConverter().getDefaultSchema(),
- sqlCreateTable.getSchemaPath());
- final String schemaPath = drillSchema.getFullSchemaName();
+ final DrillConfig drillConfig = context.getConfig();
+ final AbstractSchema drillSchema = resolveSchema(sqlCreateTable, config.getConverter().getDefaultSchema(), drillConfig);
- if (SqlHandlerUtil.getTableFromSchema(drillSchema, newTblName) != null) {
- throw UserException.validationError()
- .message("A table or view with given name [%s] already exists in schema [%s]", newTblName, schemaPath)
- .build(logger);
- }
+ checkDuplicatedObjectExistence(drillSchema, originalTableName, drillConfig, context.getSession());
- final RelNode newTblRelNodeWithPCol = SqlHandlerUtil.qualifyPartitionCol(newTblRelNode, sqlCreateTable.getPartitionColumns());
+ final RelNode newTblRelNodeWithPCol = SqlHandlerUtil.qualifyPartitionCol(newTblRelNode,
+ sqlCreateTable.getPartitionColumns());
log("Calcite", newTblRelNodeWithPCol, logger, null);
-
// Convert the query to Drill Logical plan and insert a writer operator on top.
- DrillRel drel = convertToDrel(newTblRelNodeWithPCol, drillSchema, newTblName, sqlCreateTable.getPartitionColumns(), newTblRelNode.getRowType());
+ StorageStrategy storageStrategy = sqlCreateTable.isTemporary() ?
+ StorageStrategy.TEMPORARY : StorageStrategy.PERSISTENT;
+
+ // If we are creating temporary table, initial table name will be replaced with generated table name.
+ // Generated table name is unique, UUID.randomUUID() is used for its generation.
+ // Original table name is stored in temporary tables cache, so it can be substituted to generated one during querying.
+ String newTableName = sqlCreateTable.isTemporary() ?
+ context.getSession().registerTemporaryTable(drillSchema, originalTableName) : originalTableName;
+
+ DrillRel drel = convertToDrel(newTblRelNodeWithPCol, drillSchema, newTableName,
+ sqlCreateTable.getPartitionColumns(), newTblRelNode.getRowType(), storageStrategy);
Prel prel = convertToPrel(drel, newTblRelNode.getRowType(), sqlCreateTable.getPartitionColumns());
logAndSetTextPlan("Drill Physical", prel, logger);
PhysicalOperator pop = convertToPop(prel);
PhysicalPlan plan = convertToPlan(pop);
log("Drill Plan", plan, logger);
+ String message = String.format("Creating %s table [%s].",
+ sqlCreateTable.isTemporary() ? "temporary" : "persistent", originalTableName);
+ logger.info(message);
return plan;
}
- private DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String tableName, List<String> partitionColumns, RelDataType queryRowType)
+ private DrillRel convertToDrel(RelNode relNode,
+ AbstractSchema schema,
+ String tableName,
+ List<String> partitionColumns,
+ RelDataType queryRowType,
+ StorageStrategy storageStrategy)
throws RelConversionException, SqlUnsupportedException {
final DrillRel convertedRelNode = convertToDrel(relNode);
@@ -114,7 +129,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
final RelTraitSet traits = convertedRelNode.getCluster().traitSet().plus(DrillRel.DRILL_LOGICAL);
final DrillWriterRel writerRel = new DrillWriterRel(convertedRelNode.getCluster(),
- traits, topPreservedNameProj, schema.createNewTable(tableName, partitionColumns));
+ traits, topPreservedNameProj, schema.createNewTable(tableName, partitionColumns, storageStrategy));
return new DrillScreenRel(writerRel.getCluster(), writerRel.getTraitSet(), writerRel);
}
@@ -186,7 +201,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
return (Prel) prel.copy(projectUnderWriter.getTraitSet(),
Collections.singletonList( (RelNode) projectUnderWriter));
} else {
- // find list of partiiton columns.
+ // find list of partition columns.
final List<RexNode> partitionColumnExprs = Lists.newArrayListWithExpectedSize(partitionColumns.size());
for (final String colName : partitionColumns) {
final RelDataTypeField field = childRowType.getField(colName, false, false);
@@ -242,4 +257,62 @@ public class CreateTableHandler extends DefaultSqlHandler {
return node;
}
+ /**
+ * Resolves schema taking into account type of table being created.
+ * If schema path wasn't indicated in sql call and table type to be created is temporary
+ * returns temporary workspace.
+ *
+ * If schema path is indicated, resolves to mutable drill schema.
+ * Though if table to be created is temporary table, checks if resolved schema is temporary,
+ * since temporary table are allowed to be created only in temporary workspace.
+ *
+ * @param sqlCreateTable create table call
+ * @param defaultSchema default schema
+ * @param config drill config
+ * @return resolved schema
+ * @throws UserException if attempted to create temporary table outside of temporary workspace
+ */
+ private AbstractSchema resolveSchema(SqlCreateTable sqlCreateTable, SchemaPlus defaultSchema, DrillConfig config) {
+ if (sqlCreateTable.isTemporary() && sqlCreateTable.getSchemaPath().size() == 0) {
+ return SchemaUtilites.getTemporaryWorkspace(defaultSchema, config);
+ } else {
+ AbstractSchema resolvedSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, sqlCreateTable.getSchemaPath());
+ boolean isTemporaryWorkspace = SchemaUtilites.isTemporaryWorkspace(resolvedSchema.getFullSchemaName(), config);
+
+ if (sqlCreateTable.isTemporary() && !isTemporaryWorkspace) {
+ throw UserException
+ .validationError()
+ .message(String.format("Temporary tables are not allowed to be created " +
+ "outside of default temporary workspace [%s].", config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE)))
+ .build(logger);
+ }
+ return resolvedSchema;
+ }
+ }
+
+ /**
+ * Checks if any object (persistent table / temporary table / view)
+ * with the same name as table to be created exists in indicated schema.
+ *
+ * @param drillSchema schema where table will be created
+ * @param tableName table name
+ * @param config drill config
+ * @param userSession current user session
+ * @throws UserException if duplicate is found
+ */
+ private void checkDuplicatedObjectExistence(AbstractSchema drillSchema,
+ String tableName,
+ DrillConfig config,
+ UserSession userSession) {
+ String schemaPath = drillSchema.getFullSchemaName();
+ boolean isTemporaryTable = userSession.isTemporaryTable(drillSchema, config, tableName);
+
+ if (isTemporaryTable || SqlHandlerUtil.getTableFromSchema(drillSchema, tableName) != null) {
+ throw UserException
+ .validationError()
+ .message("A table or view with given name [%s] already exists in schema [%s]",
+ tableName, schemaPath)
+ .build(logger);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java
index 517c183..a9895db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,19 +18,21 @@
package org.apache.drill.exec.planner.sql.handlers;
import java.io.IOException;
+import java.util.List;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
-import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.planner.sql.parser.SqlDropTable;
+import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
// SqlHandler for dropping a table.
@@ -46,6 +48,7 @@ public class DropTableHandler extends DefaultSqlHandler {
* Function resolves the schema and invokes the drop method
* (while IF EXISTS statement is used function invokes the drop method only if table exists).
* Raises an exception if the schema is immutable.
+ *
* @param sqlNode - SqlDropTable (SQL parse tree of drop table [if exists] query)
* @return - Single row indicating drop succeeded or table is not found while IF EXISTS statement is used,
* raise exception otherwise
@@ -55,35 +58,51 @@ public class DropTableHandler extends DefaultSqlHandler {
*/
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException {
-
SqlDropTable dropTableNode = ((SqlDropTable) sqlNode);
- SqlIdentifier tableIdentifier = dropTableNode.getTableIdentifier();
-
+ String originalTableName = dropTableNode.getName();
SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
- AbstractSchema drillSchema = null;
+ List<String> tableSchema = dropTableNode.getSchema();
+ DrillConfig drillConfig = context.getConfig();
+ UserSession session = context.getSession();
- if (tableIdentifier != null) {
- drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, dropTableNode.getSchema());
- }
-
- String tableName = dropTableNode.getName();
- if (drillSchema == null) {
- throw UserException.validationError()
- .message("Invalid table_name [%s]", tableName)
- .build(logger);
- }
+ AbstractSchema temporarySchema = resolveToTemporarySchema(tableSchema, defaultSchema, drillConfig);
+ boolean isTemporaryTable = session.isTemporaryTable(temporarySchema, drillConfig, originalTableName);
- if (dropTableNode.checkTableExistence()) {
- final Table tableToDrop = SqlHandlerUtil.getTableFromSchema(drillSchema, tableName);
+ if (isTemporaryTable) {
+ session.removeTemporaryTable(temporarySchema, originalTableName);
+ } else {
+ AbstractSchema drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, tableSchema);
+ Table tableToDrop = SqlHandlerUtil.getTableFromSchema(drillSchema, originalTableName);
if (tableToDrop == null || tableToDrop.getJdbcTableType() != Schema.TableType.TABLE) {
- return DirectPlan.createDirectPlan(context, true,
- String.format("Table [%s] not found", tableName));
+ if (dropTableNode.checkTableExistence()) {
+ return DirectPlan.createDirectPlan(context, false, String.format("Table [%s] not found", originalTableName));
+ } else {
+ throw UserException.validationError().message("Table [%s] not found", originalTableName).build(logger);
+ }
}
+ SqlHandlerUtil.dropTableFromSchema(drillSchema, originalTableName);
}
- drillSchema.dropTable(tableName);
+ String message = String.format("%s [%s] dropped", isTemporaryTable ? "Temporary table" : "Table", originalTableName);
+ logger.info(message);
+ return DirectPlan.createDirectPlan(context, true, message);
+ }
- return DirectPlan.createDirectPlan(context, true,
- String.format("Table [%s] %s", tableName, "dropped"));
+ /**
+ * If table schema is not indicated in sql call, returns temporary workspace.
+ * If schema is indicated, resolves to mutable table schema.
+ *
+ * @param tableSchema table schema
+ * @param defaultSchema default schema
+ * @param config drill config
+ * @return resolved schema
+ */
+ private AbstractSchema resolveToTemporarySchema(List<String> tableSchema, SchemaPlus defaultSchema, DrillConfig config) {
+ if (tableSchema.size() == 0) {
+ return SchemaUtilites.getTemporaryWorkspace(defaultSchema, config);
+ } else {
+ return SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, tableSchema);
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
index ca7a510..04930a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,31 +22,24 @@ import com.google.common.collect.Sets;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlWriter;
-import org.apache.calcite.sql.TypedSqlNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
-import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.planner.StarColumnHelper;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
-import org.apache.drill.exec.planner.sql.DirectPlan;
-import org.apache.drill.exec.planner.types.DrillFixedRelDataTypeImpl;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.drill.exec.store.ischema.Records;
+import java.io.IOException;
import java.util.AbstractList;
import java.util.HashSet;
import java.util.List;
@@ -235,4 +228,42 @@ public class SqlHandlerUtil {
writer.keyword(")");
}
+ /**
+ * Drops table from schema.
+ * If drop has failed makes concurrency check: checks if table still exists.
+ * If table exists, throws {@link @UserException} since drop was unsuccessful,
+ * otherwise assumes that other user had dropped the view and exists without error.
+ *
+ * @param drillSchema drill schema
+ * @param tableName table name
+ */
+ public static void dropTableFromSchema(AbstractSchema drillSchema, String tableName) {
+ try {
+ drillSchema.dropTable(tableName);
+ } catch (Exception e) {
+ if (SqlHandlerUtil.getTableFromSchema(drillSchema, tableName) != null) {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Drops view from schema.
+ * If drop has failed makes concurrency check: checks if view still exists.
+ * If view exists, throws {@link @UserException} since drop was unsuccessful,
+ * otherwise assumes that other user had dropped the view and exists without error.
+ *
+ * @param drillSchema drill schema
+ * @param viewName view name
+ */
+ public static void dropViewFromSchema(AbstractSchema drillSchema, String viewName) throws IOException {
+ try {
+ drillSchema.dropView(viewName);
+ } catch (Exception e) {
+ if (SqlHandlerUtil.getTableFromSchema(drillSchema, viewName) != null) {
+ throw e;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
index b8396e6..495e8b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
@@ -62,9 +62,10 @@ public abstract class ViewHandler extends DefaultSqlHandler {
final String newViewName = createView.getName();
+ // Disallow temporary tables usage in view definition
+ config.getConverter().disallowTemporaryTables();
// Store the viewSql as view def SqlNode is modified as part of the resolving the new table definition below.
final String viewSql = createView.getQuery().toString();
-
final ConvertedRelNode convertedRelNode = validateAndConvert(createView.getQuery());
final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
final RelNode queryRelNode = convertedRelNode.getConvertedNode();
@@ -74,36 +75,50 @@ public abstract class ViewHandler extends DefaultSqlHandler {
final SchemaPlus defaultSchema = context.getNewDefaultSchema();
final AbstractSchema drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, createView.getSchemaPath());
- final String schemaPath = drillSchema.getFullSchemaName();
final View view = new View(newViewName, viewSql, newViewRelNode.getRowType(),
SchemaUtilites.getSchemaPathAsList(defaultSchema));
- final Table existingTable = SqlHandlerUtil.getTableFromSchema(drillSchema, newViewName);
-
- if (existingTable != null) {
- if (existingTable.getJdbcTableType() != Schema.TableType.VIEW) {
- // existing table is not a view
- throw UserException.validationError()
- .message("A non-view table with given name [%s] already exists in schema [%s]",
- newViewName, schemaPath)
- .build(logger);
- }
-
- if (existingTable.getJdbcTableType() == Schema.TableType.VIEW && !createView.getReplace()) {
- // existing table is a view and create view has no "REPLACE" clause
- throw UserException.validationError()
- .message("A view with given name [%s] already exists in schema [%s]",
- newViewName, schemaPath)
- .build(logger);
- }
- }
+ validateViewCreationPossibility(drillSchema, createView, context);
final boolean replaced = drillSchema.createView(view);
final String summary = String.format("View '%s' %s successfully in '%s' schema",
- createView.getName(), replaced ? "replaced" : "created", schemaPath);
+ createView.getName(), replaced ? "replaced" : "created", drillSchema.getFullSchemaName());
return DirectPlan.createDirectPlan(context, true, summary);
}
+
+ /**
+ * Validates if view can be created in indicated schema:
+ * checks if object (persistent / temporary table) with the same name exists
+ * or if view with the same name exists but replace flag is not set.
+ *
+ * @param drillSchema schema where views will be created
+ * @param view create view call
+ * @param context query context
+ * @throws UserException if views can be created in indicated schema
+ */
+ private void validateViewCreationPossibility(AbstractSchema drillSchema, SqlCreateView view, QueryContext context) {
+ final String schemaPath = drillSchema.getFullSchemaName();
+ final String viewName = view.getName();
+ final Table existingTable = SqlHandlerUtil.getTableFromSchema(drillSchema, viewName);
+
+ if ((existingTable != null && existingTable.getJdbcTableType() != Schema.TableType.VIEW) ||
+ context.getSession().isTemporaryTable(drillSchema, context.getConfig(), viewName)) {
+ // existing table is not a view
+ throw UserException
+ .validationError()
+ .message("A non-view table with given name [%s] already exists in schema [%s]", viewName, schemaPath)
+ .build(logger);
+ }
+
+ if ((existingTable != null && existingTable.getJdbcTableType() == Schema.TableType.VIEW) && !view.getReplace()) {
+ // existing table is a view and create view has no "REPLACE" clause
+ throw UserException
+ .validationError()
+ .message("A view with given name [%s] already exists in schema [%s]", viewName, schemaPath)
+ .build(logger);
+ }
+ }
}
/** Handler for Drop View [If Exists] DDL command. */
@@ -124,7 +139,7 @@ public abstract class ViewHandler extends DefaultSqlHandler {
final Table viewToDrop = SqlHandlerUtil.getTableFromSchema(drillSchema, viewName);
if (dropView.checkViewExistence()) {
if (viewToDrop == null || viewToDrop.getJdbcTableType() != Schema.TableType.VIEW){
- return DirectPlan.createDirectPlan(context, true,
+ return DirectPlan.createDirectPlan(context, false,
String.format("View [%s] not found in schema [%s].", viewName, schemaPath));
}
} else {
@@ -139,7 +154,7 @@ public abstract class ViewHandler extends DefaultSqlHandler {
}
}
- drillSchema.dropView(viewName);
+ SqlHandlerUtil.dropViewFromSchema(drillSchema, viewName);
return DirectPlan.createDirectPlan(context, true,
String.format("View [%s] deleted successfully from schema [%s].", viewName, schemaPath));
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
index 53e3cd5..db934e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -161,7 +161,7 @@ public class CompoundIdentifierConverter extends SqlShuttle {
//SqlNode offset,
//SqlNode fetch,
rules.put(SqlSelect.class, R(D, E, D, E, E, E, E, E, D, D));
- rules.put(SqlCreateTable.class, R(D, D, D, E));
+ rules.put(SqlCreateTable.class, R(D, D, D, E, D));
rules.put(SqlCreateView.class, R(D, E, E, D));
rules.put(SqlDescribeTable.class, R(D, D, E));
rules.put(SqlDropView.class, R(D, D));
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
index 5835b10..bba60b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -48,8 +48,13 @@ public class SqlCreateTable extends DrillSqlCall {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_TABLE", SqlKind.OTHER) {
@Override
public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
- Preconditions.checkArgument(operands.length == 4, "SqlCreateTable.createCall() has to get 4 operands!");
- return new SqlCreateTable(pos, (SqlIdentifier) operands[0], (SqlNodeList) operands[1], (SqlNodeList) operands[2], operands[3]);
+ Preconditions.checkArgument(operands.length == 5, "SqlCreateTable.createCall() has to get 5 operands!");
+ return new SqlCreateTable(pos,
+ (SqlIdentifier) operands[0],
+ (SqlNodeList) operands[1],
+ (SqlNodeList) operands[2],
+ operands[3],
+ (SqlLiteral) operands[4]);
}
};
@@ -57,13 +62,20 @@ public class SqlCreateTable extends DrillSqlCall {
private final SqlNodeList fieldList;
private final SqlNodeList partitionColumns;
private final SqlNode query;
-
- public SqlCreateTable(SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList, SqlNodeList partitionColumns, SqlNode query) {
+ private final SqlLiteral isTemporary;
+
+ public SqlCreateTable(SqlParserPos pos,
+ SqlIdentifier tblName,
+ SqlNodeList fieldList,
+ SqlNodeList partitionColumns,
+ SqlNode query,
+ SqlLiteral isTemporary) {
super(pos);
this.tblName = tblName;
this.fieldList = fieldList;
this.partitionColumns = partitionColumns;
this.query = query;
+ this.isTemporary = isTemporary;
}
@Override
@@ -78,12 +90,16 @@ public class SqlCreateTable extends DrillSqlCall {
ops.add(fieldList);
ops.add(partitionColumns);
ops.add(query);
+ ops.add(isTemporary);
return ops;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("CREATE");
+ if (isTemporary.booleanValue()) {
+ writer.keyword("TEMPORARY");
+ }
writer.keyword("TABLE");
tblName.unparse(writer, leftPrec, rightPrec);
if (fieldList.size() > 0) {
@@ -142,4 +158,6 @@ public class SqlCreateTable extends DrillSqlCall {
public SqlNode getQuery() { return query; }
+ public boolean isTemporary() { return isTemporary.booleanValue(); }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 4e17249..281b124 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -299,6 +299,17 @@ public class UserServer extends BasicServer<RpcType, UserClientConnectionImpl> {
public SocketAddress getRemoteAddress() {
return getChannel().remoteAddress();
}
+
+ @Override
+ public void closeSession() {
+ session.close();
+ }
+
+ @Override
+ public void close() {
+ closeSession();
+ super.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 3bf9051..c3639d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,9 +17,14 @@
*/
package org.apache.drill.exec.rpc.user;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Preconditions;
@@ -27,9 +32,13 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.planner.sql.SchemaUtilites;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerUtil;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.proto.UserProtos.Property;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
@@ -37,8 +46,14 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.SessionOptionManager;
import com.google.common.collect.Maps;
-
-public class UserSession {
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class UserSession implements Closeable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserSession.class);
public static final String SCHEMA = "schema";
@@ -54,18 +69,43 @@ public class UserSession {
private Map<String, String> properties;
private OptionManager sessionOptions;
private final AtomicInteger queryCount;
+ private final String sessionId;
+
+ /** Stores list of temporary tables, key is original table name converted to lower case to achieve case-insensitivity,
+ * value is generated table name. **/
+ private final ConcurrentMap<String, String> temporaryTables;
+ /** Stores list of session temporary locations, key is path to location, value is file system associated with location. **/
+ private final ConcurrentMap<Path, FileSystem> temporaryLocations;
+
+ /** On session close deletes all session temporary locations recursively and clears temporary locations list. */
+ @Override
+ public void close() {
+ for (Map.Entry<Path, FileSystem> entry : temporaryLocations.entrySet()) {
+ Path path = entry.getKey();
+ FileSystem fs = entry.getValue();
+ try {
+ fs.delete(path, true);
+ logger.info("Deleted session temporary location [{}] from file system [{}]",
+ path.toUri().getPath(), fs.getUri());
+ } catch (Exception e) {
+ logger.warn("Error during session temporary location [{}] deletion from file system [{}]: [{}]",
+ path.toUri().getPath(), fs.getUri(), e.getMessage());
+ }
+ }
+ temporaryLocations.clear();
+ }
/**
* Implementations of this interface are allowed to increment queryCount.
* {@link org.apache.drill.exec.work.user.UserWorker} should have a member that implements the interface.
* No other core class should implement this interface. Test classes may implement (see ControlsInjectionUtil).
*/
- public static interface QueryCountIncrementer {
- public void increment(final UserSession session);
+ public interface QueryCountIncrementer {
+ void increment(final UserSession session);
}
public static class Builder {
- UserSession userSession;
+ private UserSession userSession;
public static Builder newBuilder() {
return new Builder();
@@ -115,6 +155,9 @@ public class UserSession {
private UserSession() {
queryCount = new AtomicInteger(0);
+ sessionId = UUID.randomUUID().toString();
+ temporaryTables = Maps.newConcurrentMap();
+ temporaryLocations = Maps.newConcurrentMap();
}
public boolean isSupportComplexTypes() {
@@ -197,7 +240,7 @@ public class UserSession {
/**
* Get default schema from current default schema path and given schema tree.
- * @param rootSchema
+ * @param rootSchema root schema
* @return A {@link org.apache.calcite.schema.SchemaPlus} object.
*/
public SchemaPlus getDefaultSchema(SchemaPlus rootSchema) {
@@ -207,18 +250,117 @@ public class UserSession {
return null;
}
- final SchemaPlus defaultSchema = SchemaUtilites.findSchema(rootSchema, defaultSchemaPath);
+ return SchemaUtilites.findSchema(rootSchema, defaultSchemaPath);
+ }
+
+ public boolean setSessionOption(String name, String value) {
+ return true;
+ }
+
+ /**
+ * @return unique session identifier
+ */
+ public String getSessionId() { return sessionId; }
+
+ /**
+ * Creates and adds session temporary location if absent using schema configuration.
+ * Generates temporary table name and stores it's original name as key
+ * and generated name as value in session temporary tables cache.
+ * Original temporary name is converted to lower case to achieve case-insensitivity.
+ * If original table name already exists, new name is not regenerated and is reused.
+ * This can happen if default temporary workspace was changed (file system or location) or
+ * orphan temporary table name has remained (name was registered but table creation did not succeed).
+ *
+ * @param schema table schema
+ * @param tableName original table name
+ * @return generated temporary table name
+ * @throws IOException if error during session temporary location creation
+ */
+ public String registerTemporaryTable(AbstractSchema schema, String tableName) throws IOException {
+ addTemporaryLocation((WorkspaceSchemaFactory.WorkspaceSchema) schema);
+ String temporaryTableName = Paths.get(sessionId, UUID.randomUUID().toString()).toString();
+ String oldTemporaryTableName = temporaryTables.putIfAbsent(tableName.toLowerCase(), temporaryTableName);
+ return oldTemporaryTableName == null ? temporaryTableName : oldTemporaryTableName;
+ }
+
+ /**
+ * Returns generated temporary table name from the list of session temporary tables, null otherwise.
+ * Original temporary name is converted to lower case to achieve case-insensitivity.
+ *
+ * @param tableName original table name
+ * @return generated temporary table name
+ */
+ public String resolveTemporaryTableName(String tableName) {
+ return temporaryTables.get(tableName.toLowerCase());
+ }
- if (defaultSchema == null) {
- // If the current schema resolves to null, return root schema as the current default schema.
- return defaultSchema;
+ /**
+ * Checks if passed table is temporary, table name is case-insensitive.
+ * Before looking for table checks if passed schema is temporary and returns false if not
+ * since temporary tables are allowed to be created in temporary workspace only.
+ * If passed workspace is temporary, looks for temporary table.
+ * First checks if table name is among temporary tables, if not returns false.
+ * If temporary table named was resolved, checks that temporary table exists on disk,
+ * to ensure that temporary table actually exists and resolved table name is not orphan
+ * (for example, in result of unsuccessful temporary table creation).
+ *
+ * @param drillSchema table schema
+ * @param config drill config
+ * @param tableName original table name
+ * @return true if temporary table exists in schema, false otherwise
+ */
+ public boolean isTemporaryTable(AbstractSchema drillSchema, DrillConfig config, String tableName) {
+ if (!SchemaUtilites.isTemporaryWorkspace(drillSchema.getFullSchemaName(), config)) {
+ return false;
}
+ String temporaryTableName = resolveTemporaryTableName(tableName);
+ if (temporaryTableName != null) {
+ Table temporaryTable = SqlHandlerUtil.getTableFromSchema(drillSchema, temporaryTableName);
+ if (temporaryTable != null && temporaryTable.getJdbcTableType() == Schema.TableType.TABLE) {
+ return true;
+ }
+ }
+ return false;
+ }
- return defaultSchema;
+ /**
+ * Removes temporary table name from the list of session temporary tables.
+ * Original temporary name is converted to lower case to achieve case-insensitivity.
+ *
+ * @param tableName original table name
+ */
+ public void removeTemporaryTable(AbstractSchema drillSchema, String tableName) {
+ String temporaryTable = resolveTemporaryTableName(tableName);
+ if (temporaryTable == null) {
+ return;
+ }
+ SqlHandlerUtil.dropTableFromSchema(drillSchema, temporaryTable);
+ temporaryTables.remove(tableName.toLowerCase());
}
- public boolean setSessionOption(String name, String value) {
- return true;
+ /**
+ * Session temporary tables are stored under temporary workspace location in session folder
+ * defined by unique session id. These session temporary locations are deleted on session close.
+ * If default temporary workspace file system or location is changed at runtime,
+ * new session temporary location will be added with corresponding file system
+ * to the list of session temporary locations. If location does not exist it will be created and
+ * {@link StorageStrategy#TEMPORARY} storage rules will be applied to it.
+ *
+ * @param temporaryWorkspace temporary workspace
+ * @throws IOException in case of error during temporary location creation
+ */
+ private void addTemporaryLocation(WorkspaceSchemaFactory.WorkspaceSchema temporaryWorkspace) throws IOException {
+ DrillFileSystem fs = temporaryWorkspace.getFS();
+ Path temporaryLocation = new Path(Paths.get(fs.getUri().toString(),
+ temporaryWorkspace.getDefaultLocation(), sessionId).toString());
+
+ FileSystem fileSystem = temporaryLocations.putIfAbsent(temporaryLocation, fs);
+
+ if (fileSystem == null) {
+ StorageStrategy.TEMPORARY.createPathAndApply(fs, temporaryLocation);
+ Preconditions.checkArgument(fs.exists(temporaryLocation),
+ String.format("Temporary location should exist [%s]", temporaryLocation.toUri().getPath()));
+ }
}
private String getProp(String key) {
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 547915e..25776ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,6 +20,7 @@ package org.apache.drill.exec.server;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.StackTrace;
import org.apache.drill.common.config.DrillConfig;
@@ -30,13 +31,17 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.server.options.OptionValue.OptionType;
import org.apache.drill.exec.server.rest.WebServer;
import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaTreeProvider;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
@@ -123,6 +128,7 @@ public class Drillbit implements AutoCloseable {
storageRegistry.init();
drillbitContext.getOptionManager().init();
javaPropertiesToSystemOptions();
+ validateTemporaryWorkspace(manager.getContext());
manager.getContext().getRemoteFunctionRegistry().init(context.getConfig(), storeProvider, coord);
registrationHandle = coord.register(md);
webServer.start();
@@ -215,6 +221,21 @@ public class Drillbit implements AutoCloseable {
}
/**
+ * Validates that temporary workspace indicated in configuration is
+ * mutable and file-based (instance of {@link WorkspaceSchemaFactory.WorkspaceSchema}).
+ *
+ * @param context drillbit context
+ * @throws Exception in case when temporary table schema is not mutable or
+ * not file-based (instance of WorkspaceSchemaFactory.WorkspaceSchema)
+ */
+ private void validateTemporaryWorkspace(DrillbitContext context) throws Exception {
+ try (SchemaTreeProvider schemaTreeProvider = new SchemaTreeProvider(context)) {
+ final SchemaPlus rootSchema = schemaTreeProvider.createRootSchema(context.getOptionManager());
+ SchemaUtilites.getTemporaryWorkspace(rootSchema, context.getConfig());
+ }
+ }
+
+ /**
* Shutdown hook for Drillbit. Closes the drillbit, and reports on errors that
* occur during closure, as well as the location the drillbit was started from.
*/
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 7a16d0a..618841b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -117,18 +117,33 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
}
/**
+ * Creates table entry using table name, list of partition columns
+ * and storage strategy used to create table folder and files
*
* @param tableName : new table name.
* @param partitionColumns : list of partition columns. Empty list if there is no partition columns.
- * @return
+ * @param storageStrategy : storage strategy used to create table folder and files
+ * @return create table entry
*/
- public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
+ public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
throw UserException.unsupportedError()
.message("Creating new tables is not supported in schema [%s]", getSchemaPath())
.build(logger);
}
/**
+ * Creates table entry using table name and list of partition columns if any.
+ * Table folder and files will be created using persistent storage strategy.
+ *
+ * @param tableName : new table name.
+ * @param partitionColumns : list of partition columns. Empty list if there is no partition columns.
+ * @return create table entry
+ */
+ public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
+ return createNewTable(tableName, partitionColumns, StorageStrategy.PERSISTENT);
+ }
+
+ /**
* Reports whether to show items from this schema in INFORMATION_SCHEMA
* tables.
* (Controls ... TODO: Doc.: Mention what this typically controls or
http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
index d05cc43..4f426bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,7 +22,10 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ViewExpansionContext;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider;
import org.apache.drill.exec.util.ImpersonationUtil;
@@ -49,6 +52,33 @@ public class SchemaTreeProvider implements AutoCloseable {
}
/**
+ * Return root schema for process user.
+ *
+ * @param options list of options
+ * @return root of the schema tree
+ */
+ public SchemaPlus createRootSchema(final OptionManager options) {
+ SchemaConfigInfoProvider schemaConfigInfoProvider = new SchemaConfigInfoProvider() {
+
+ @Override
+ public ViewExpansionContext getViewExpansionContext() {
+ throw new UnsupportedOperationException("View expansion context is not supported");
+ }
+
+ @Override
+ public OptionValue getOption(String optionKey) {
+ return options.getOption(optionKey);
+ }
+ };
+
+ final SchemaConfig schemaConfig = SchemaConfig.newBuilder(
+ ImpersonationUtil.getProcessUserName(), schemaConfigInfoProvider)
+ .build();
+
+ return createRootSchema(schemaConfig);
+ }
+
+ /**
* Return root schema with schema owner as the given user.
*
* @param userName Name of the user who is accessing the storage sources.
[4/5] drill git commit: DRILL-5104: Foreman should not set external
sort memory for a physical plan
Posted by jn...@apache.org.
DRILL-5104: Foreman should not set external sort memory for a physical plan
Physical plans include a plan for memory allocations. However, the code
path in Foreman replans external sort memory, even for a physical plan.
This makes it impossible to use a physical plan to test memory
configuration.
This change avoids changing memory settings in a physical plan; while
preserving the adjustments for logical plans or SQL queries.
Revised to put a property in the plan itself. Old plans, and those
generated from SQL, will have memory allocations applied. Plans
marked as already "resource management" planned will be used as-is.
Includes a unit test that demonstrates the new behavior.
close apache/drill#703
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/82176976
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/82176976
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/82176976
Branch: refs/heads/master
Commit: 8217697647e7ed10c52cfa91b860730302a339e8
Parents: 08ca5e0
Author: Paul Rogers <pr...@maprtech.com>
Authored: Tue Dec 13 14:36:42 2016 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Jan 23 17:08:39 2017 -0800
----------------------------------------------------------------------
.../apache/drill/exec/physical/PhysicalPlan.java | 3 ---
.../exec/util/MemoryAllocationUtilities.java | 9 ++++++++-
.../apache/drill/exec/work/foreman/Foreman.java | 1 +
.../impl/xsort/TestSimpleExternalSort.java | 6 ++++--
.../resources/xsort/one_key_sort_descending.json | 3 ++-
.../drill/common/logical/PlanProperties.java | 18 ++++++++++++++++--
6 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
index 78b882b..e0902c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
@@ -62,10 +62,8 @@ public class PhysicalPlan {
}else{
return list;
}
-
}
-
@JsonProperty("head")
public PlanProperties getProperties() {
return properties;
@@ -89,5 +87,4 @@ public class PhysicalPlan {
throw new RuntimeException(e);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index 38dfcd0..678167f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -41,6 +41,13 @@ public class MemoryAllocationUtilities {
* @param queryContext
*/
public static void setupSortMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
+
+ // Test plans may already have a pre-defined memory plan.
+ // Otherwise, determine memory allocation.
+
+ if (plan.getProperties().hasResourcePlan) {
+ return;
+ }
// look for external sorts
final List<ExternalSort> sortList = new LinkedList<>();
for (final PhysicalOperator op : plan.getSortedOperators()) {
@@ -64,6 +71,6 @@ public class MemoryAllocationUtilities {
externalSort.setMaxAllocation(maxSortAlloc);
}
}
+ plan.getProperties().hasResourcePlan = true;
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index c6a3104..30718b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -774,6 +774,7 @@ public class Foreman implements Runnable {
}
}
+ @SuppressWarnings("resource")
@Override
public void close() {
Preconditions.checkState(!isClosed);
http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index b34a466..85975cb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -42,7 +42,6 @@ import org.junit.rules.TestRule;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
-@Ignore
public class TestSimpleExternalSort extends BaseTestQuery {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleExternalSort.class);
DrillConfig c = DrillConfig.create();
@@ -50,6 +49,7 @@ public class TestSimpleExternalSort extends BaseTestQuery {
@Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000);
+ @Ignore
@Test
public void mergeSortWithSv2() throws Exception {
List<QueryDataBatch> results = testPhysicalFromFileWithResults("xsort/one_key_sort_descending_sv2.json");
@@ -109,7 +109,7 @@ public class TestSimpleExternalSort extends BaseTestQuery {
for (QueryDataBatch b : results) {
if (b.getHeader().getRowCount() == 0) {
- break;
+ continue;
}
batchCount++;
RecordBatchLoader loader = new RecordBatchLoader(allocator);
@@ -132,6 +132,7 @@ public class TestSimpleExternalSort extends BaseTestQuery {
}
@Test
+ @Ignore
public void sortOneKeyDescendingExternalSort() throws Throwable{
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
@@ -186,6 +187,7 @@ public class TestSimpleExternalSort extends BaseTestQuery {
}
@Test
+ @Ignore
public void outOfMemoryExternalSort() throws Throwable{
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/exec/java-exec/src/test/resources/xsort/one_key_sort_descending.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/xsort/one_key_sort_descending.json b/exec/java-exec/src/test/resources/xsort/one_key_sort_descending.json
index f4eab5d..b4794ad 100644
--- a/exec/java-exec/src/test/resources/xsort/one_key_sort_descending.json
+++ b/exec/java-exec/src/test/resources/xsort/one_key_sort_descending.json
@@ -4,7 +4,8 @@
version:"1",
generator:{
type:"manual"
- }
+ },
+ hasResourcePlan: true
},
graph:[
{
http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/logical/src/main/java/org/apache/drill/common/logical/PlanProperties.java
----------------------------------------------------------------------
diff --git a/logical/src/main/java/org/apache/drill/common/logical/PlanProperties.java b/logical/src/main/java/org/apache/drill/common/logical/PlanProperties.java
index ce9603e..f4de0eb 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/PlanProperties.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/PlanProperties.java
@@ -35,6 +35,12 @@ public class PlanProperties {
public JSONOptions options;
public int queue;
+ /**
+ * Indicates if the plan has been planned for resource management
+ * (memory, etc.) or if this plan must still be computed.
+ */
+ public boolean hasResourcePlan;
+
// @JsonInclude(Include.NON_NULL)
public static class Generator {
public String type;
@@ -55,7 +61,8 @@ public class PlanProperties {
@JsonProperty("type") PlanType type,
@JsonProperty("mode") ResultMode resultMode,
@JsonProperty("options") JSONOptions options,
- @JsonProperty("queue") int queue
+ @JsonProperty("queue") int queue,
+ @JsonProperty("hasResourcePlan") boolean hasResourcePlan
) {
this.version = version;
this.queue = queue;
@@ -63,6 +70,7 @@ public class PlanProperties {
this.type = type;
this.resultMode = resultMode == null ? ResultMode.EXEC : resultMode;
this.options = options;
+ this.hasResourcePlan = hasResourcePlan;
}
public static PlanPropertiesBuilder builder() {
@@ -76,6 +84,7 @@ public class PlanProperties {
private ResultMode mode = ResultMode.EXEC;
private JSONOptions options;
private int queueNumber = 0;
+ private boolean hasResourcePlan = false;
public PlanPropertiesBuilder type(PlanType type) {
this.type = type;
@@ -112,8 +121,13 @@ public class PlanProperties {
return this;
}
+ public PlanPropertiesBuilder generator(boolean hasResourcePlan) {
+ this.hasResourcePlan = hasResourcePlan;
+ return this;
+ }
+
public PlanProperties build() {
- return new PlanProperties(version, generator, type, mode, options, queueNumber);
+ return new PlanProperties(version, generator, type, mode, options, queueNumber, hasResourcePlan);
}
}
[3/5] drill git commit: DRILL-5097: Using
store.parquet.reader.int96_as_timestamp gives IOOB whereas convert_from works
Posted by jn...@apache.org.
DRILL-5097: Using store.parquet.reader.int96_as_timestamp gives IOOB whereas convert_from works
close apache/drill#697
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/08ca5e09
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/08ca5e09
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/08ca5e09
Branch: refs/heads/master
Commit: 08ca5e0923436ebeb3e190c60892167d9beab0c2
Parents: bb29f19
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Wed Dec 14 16:24:08 2016 +0000
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Jan 23 17:08:20 2017 -0800
----------------------------------------------------------------------
.../NullableFixedByteAlignedReaders.java | 10 +++++++++-
.../physical/impl/writer/TestParquetWriter.java | 19 +++++++++++++++++++
.../test/resources/parquet/data.snappy.parquet | Bin 0 -> 59200 bytes
3 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/08ca5e09/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index e20504f..b233a65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -110,9 +110,14 @@ public class NullableFixedByteAlignedReaders {
/**
* Class for reading parquet fixed binary type INT96, which is used for storing hive,
- * impala timestamp values with nanoseconds precision. So it reads such values as a drill timestamp.
+ * impala timestamp values with nanoseconds precision (12 bytes). So it reads such values as a drill timestamp (8 bytes).
*/
static class NullableFixedBinaryAsTimeStampReader extends NullableFixedByteAlignedReader<NullableTimeStampVector> {
+ /**
+ * The width of each element of the TimeStampVector is 8 byte(s).
+ */
+ private static final int TIMESTAMP_LENGTH_IN_BITS = 64;
+
NullableFixedBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeStampVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
@@ -132,6 +137,9 @@ public class NullableFixedByteAlignedReaders {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue));
}
}
+ // The nanos precision is cut to millis. Therefore the length of single timestamp value is 8 bytes(s)
+ // instead of 12 byte(s).
+ dataTypeLengthInBits = TIMESTAMP_LENGTH_IN_BITS;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/08ca5e09/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index ae0e699..362c943 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -25,6 +25,7 @@ import java.io.File;
import java.io.FileWriter;
import java.math.BigDecimal;
import java.sql.Date;
+import java.sql.Timestamp;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
@@ -963,5 +964,23 @@ public class TestParquetWriter extends BaseTestQuery {
}
}
+ @Test // DRILL-5097
+ public void testInt96TimeStampValueWidth() throws Exception {
+ try {
+ testBuilder()
+ .ordered()
+ .sqlQuery("select c, d from cp.`parquet/data.snappy.parquet` where d = '2015-07-18 13:52:51'")
+ .optionSettingQueriesForTestQuery(
+ "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP)
+ .baselineColumns("c", "d")
+ .baselineValues(new DateTime(Date.valueOf("2011-04-11").getTime()),
+ new DateTime(Timestamp.valueOf("2015-07-18 13:52:51").getTime()))
+ .build()
+ .run();
+ } finally {
+ test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/08ca5e09/exec/java-exec/src/test/resources/parquet/data.snappy.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/data.snappy.parquet b/exec/java-exec/src/test/resources/parquet/data.snappy.parquet
new file mode 100644
index 0000000..1ce3d75
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/data.snappy.parquet differ