You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/01/24 06:15:21 UTC

[1/5] drill git commit: DRILL-4956: Temporary tables support

Repository: drill
Updated Branches:
  refs/heads/master 8a4d7a994 -> 2af709f43


http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
new file mode 100644
index 0000000..a125bae
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Contains list of parameters that will be used to store path / files on file system. */
+public class StorageStrategy {
+
+  /**
+   * Primary is used for persistent tables.
+   * For directories: drwxrwxr-x (owner and group have full access, others can read and execute).
+   * For files: -rw-r--r-- (owner can read and write, group and others can read).
+   * Folders and files are not deleted on file system close.
+   */
+  public static final StorageStrategy PERSISTENT = new StorageStrategy("775", "644", false);
+
+  /**
+   * Primary is used for temporary tables.
+   * For directories: drwx------ (owner has full access, group and others have no access).
+   * For files: -rw------- (owner can read and write, group and others have no access).
+   * Folders and files are deleted on file system close.
+   */
+  public static final StorageStrategy TEMPORARY = new StorageStrategy("700", "600", true);
+
+  private final String folderPermission;
+  private final String filePermission;
+  private final boolean deleteOnExit;
+
+  @JsonCreator
+  public StorageStrategy(@JsonProperty("folderPermission") String folderPermission,
+                         @JsonProperty("filePermission") String filePermission,
+                         @JsonProperty("deleteOnExit") boolean deleteOnExit) {
+    this.folderPermission = folderPermission;
+    this.filePermission = filePermission;
+    this.deleteOnExit = deleteOnExit;
+  }
+
+  public String getFolderPermission() {
+    return folderPermission;
+  }
+
+  public String getFilePermission() { return filePermission; }
+
+  public boolean isDeleteOnExit() {
+    return deleteOnExit;
+  }
+
+  /**
+   * Creates passed path on appropriate file system.
+   * Before creation checks which parent directories do not exists.
+   * Applies storage strategy rules to all newly created directories.
+   * Will return first created path or null already existed.
+   *
+   * Case 1: /a/b -> already exists, attempt to create /a/b/c/d
+   * Will create path and return /a/b/c.
+   * Case 2: /a/b/c -> already exists, attempt to create /a/b/c/d
+   * Will create path and return /a/b/c/d.
+   * Case 3: /a/b/c/d -> already exists, will return null.
+   *
+   * @param fs file system where file should be located
+   * @param path location path
+   * @return first created parent path or file
+   * @throws IOException is thrown in case of problems while creating path, setting permission
+   *         or adding path to delete on exit list
+   */
+  public Path createPathAndApply(FileSystem fs, Path path) throws IOException {
+    List<Path> locations = getNonExistentLocations(fs, path);
+    if (locations.isEmpty()) {
+      return null;
+    }
+    fs.mkdirs(path);
+    for (Path location : locations) {
+      applyStrategy(fs, location, folderPermission, deleteOnExit);
+    }
+    return locations.get(locations.size() - 1);
+  }
+
+  /**
+   * Creates passed file on appropriate file system.
+   * Before creation checks which parent directories do not exists.
+   * Applies storage strategy rules to all newly created directories and file.
+   * Will return first created parent path or file if no new parent paths created.
+   *
+   * Case 1: /a/b -> already exists, attempt to create /a/b/c/some_file.txt
+   * Will create file and return /a/b/c.
+   * Case 2: /a/b/c -> already exists, attempt to create /a/b/c/some_file.txt
+   * Will create file and return /a/b/c/some_file.txt.
+   * Case 3: /a/b/c/some_file.txt -> already exists, will fail.
+   *
+   * @param fs file system where file should be located
+   * @param file file path
+   * @return first created parent path or file
+   * @throws IOException is thrown in case of problems while creating path, setting permission
+   *         or adding path to delete on exit list
+   */
+  public Path createFileAndApply(FileSystem fs, Path file) throws IOException {
+    List<Path> locations = getNonExistentLocations(fs, file.getParent());
+    if (!fs.createNewFile(file)) {
+      throw new IOException(String.format("File [%s] already exists on file system [%s].",
+          file.toUri().getPath(), fs.getUri()));
+    }
+    applyToFile(fs, file);
+
+    if (locations.isEmpty()) {
+      return file;
+    }
+
+    for (Path location : locations) {
+      applyStrategy(fs, location, folderPermission, deleteOnExit);
+    }
+    return locations.get(locations.size() - 1);
+  }
+
+  /**
+   * Applies storage strategy to file:
+   * sets permission and adds to file system delete on exit list if needed.
+   *
+   * @param fs file system
+   * @param file path to file
+   * @throws IOException is thrown in case of problems while setting permission
+   *         or adding file to delete on exit list
+   */
+  public void applyToFile(FileSystem fs, Path file) throws IOException {
+    applyStrategy(fs, file, filePermission, deleteOnExit);
+  }
+
+  /**
+   * Returns list of parent locations that do not exist, including initial location.
+   * First in the list will be initial location,
+   * last in the list will be last parent location that does not exist.
+   * If all locations exist, empty list will be returned.
+   *
+   * Case 1: if /a/b exists and passed location is /a/b/c/d,
+   * will return list with two elements: 0 -> /a/b/c/d, 1 -> /a/b/c
+   * Case 2: if /a/b exists and passed location is /a/b, will return empty list.
+   *
+   * @param fs file system where locations should be located
+   * @param path location path
+   * @return list of locations that do not exist
+   * @throws IOException in case of troubles accessing file system
+   */
+  private List<Path> getNonExistentLocations(FileSystem fs, Path path) throws IOException {
+    List<Path> locations = Lists.newArrayList();
+    Path starting = path;
+    while (starting != null && !fs.exists(starting)) {
+      locations.add(starting);
+      starting = starting.getParent();
+    }
+    return locations;
+  }
+
+  /**
+   * Applies storage strategy to passed path on passed file system.
+   * Sets appropriate permission
+   * and adds to file system delete on exit list if needed.
+   *
+   * @param fs file system where path is located
+   * @param path path location
+   * @param permission permission to be applied
+   * @param deleteOnExit if to delete path on exit
+   * @throws IOException is thrown in case of problems while setting permission
+   *         or adding path to delete on exit list
+   */
+  private void applyStrategy(FileSystem fs, Path path, String permission, boolean deleteOnExit) throws IOException {
+    fs.setPermission(path, new FsPermission(permission));
+    if (deleteOnExit) {
+      fs.deleteOnExit(path);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
index e502e99..2110f38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -63,8 +63,8 @@ public class SubSchemaWrapper extends AbstractSchema {
   }
 
   @Override
-  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
-    return innerSchema.createNewTable(tableName, partitionColumns);
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
+    return innerSchema.createNewTable(tableName, partitionColumns, storageStrategy);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index e0f5438..3a89591 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -288,6 +288,9 @@ public class FileSelection {
     }
     final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().toString());
     logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
+    if (fileSel == null) {
+      return null;
+    }
     fileSel.setHadWildcard(hasWildcard);
     return fileSel;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 526dfb1..e3e01c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,12 +24,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
@@ -150,8 +148,8 @@ public class FileSystemSchemaFactory implements SchemaFactory{
     }
 
     @Override
-    public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
-      return defaultSchema.createNewTable(tableName, partitionColumns);
+    public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
+      return defaultSchema.createNewTable(tableName, partitionColumns, storageStrategy);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index dac313b..8416ed8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -53,13 +53,13 @@ import org.apache.drill.exec.dotdrill.DotDrillFile;
 import org.apache.drill.exec.dotdrill.DotDrillType;
 import org.apache.drill.exec.dotdrill.DotDrillUtil;
 import org.apache.drill.exec.dotdrill.View;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
 import org.apache.drill.exec.planner.logical.DrillViewTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
-import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
@@ -522,7 +522,6 @@ public class WorkspaceSchemaFactory {
       } catch (UnsupportedOperationException e) {
         logger.debug("The filesystem for this workspace does not support this operation.", e);
       }
-
       return tables.get(tableKey);
     }
 
@@ -540,7 +539,7 @@ public class WorkspaceSchemaFactory {
     }
 
     @Override
-    public CreateTableEntry createNewTable(String tableName, List<String> partitonColumns) {
+    public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
       String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
       FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
       if (formatPlugin == null) {
@@ -553,7 +552,8 @@ public class WorkspaceSchemaFactory {
           (FileSystemConfig) plugin.getConfig(),
           formatPlugin,
           config.getLocation() + Path.SEPARATOR + tableName,
-          partitonColumns);
+          partitionColumns,
+          storageStrategy);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index db22568..52ce8b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,11 +21,11 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
@@ -48,6 +48,7 @@ public class EasyWriter extends AbstractWriter {
       @JsonProperty("child") PhysicalOperator child,
       @JsonProperty("location") String location,
       @JsonProperty("partitionColumns") List<String> partitionColumns,
+      @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
       @JsonProperty("storage") StoragePluginConfig storageConfig,
       @JsonProperty("format") FormatPluginConfig formatConfig,
       @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
@@ -57,6 +58,7 @@ public class EasyWriter extends AbstractWriter {
     Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
     this.location = location;
     this.partitionColumns = partitionColumns;
+    setStorageStrategy(storageStrategy);
   }
 
   public EasyWriter(PhysicalOperator child,
@@ -92,7 +94,9 @@ public class EasyWriter extends AbstractWriter {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new EasyWriter(child, location, partitionColumns, formatPlugin);
+    EasyWriter writer = new EasyWriter(child, location, partitionColumns, formatPlugin);
+    writer.setStorageStrategy(getStorageStrategy());
+    return writer;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 30c248e..58ca95f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -83,7 +83,7 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
     options.put("uglify", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY)));
     options.put("skipnulls", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS)));
 
-    RecordWriter recordWriter = new JsonRecordWriter();
+    RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy());
     recordWriter.init(options);
 
     return recordWriter;

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index f27e04c..c37da8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.store.EventBasedRecordWriter;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
@@ -46,6 +46,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class);
   private static final String LINE_FEED = String.format("%n");
 
+  private Path cleanUpLocation;
   private String location;
   private String prefix;
 
@@ -58,11 +59,13 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   private FSDataOutputStream stream = null;
 
   private final JsonFactory factory = new JsonFactory();
+  private final StorageStrategy storageStrategy;
 
   // Record write status
   private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
 
-  public JsonRecordWriter(){
+  public JsonRecordWriter(StorageStrategy storageStrategy){
+    this.storageStrategy = storageStrategy == null ? StorageStrategy.PERSISTENT : storageStrategy;
   }
 
   @Override
@@ -81,7 +84,17 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
 
     Path fileName = new Path(location, prefix + "_" + index + "." + extension);
     try {
+      // json writer does not support partitions, so only one file can be created
+      // and thus only one location should be deleted in case of abort
+      // to ensure that our writer was the first to create output file,
+      // we create empty output file first and fail if file exists
+      cleanUpLocation = storageStrategy.createFileAndApply(fs, fileName);
+
+      // since empty output file will be overwritten (some file systems may restrict append option)
+      // we need to re-apply file permission
       stream = fs.create(fileName);
+      storageStrategy.applyToFile(fs, fileName);
+
       JsonGenerator generator = factory.createGenerator(stream).useDefaultPrettyPrinter();
       if (uglify) {
         generator = generator.setPrettyPrinter(new MinimalPrettyPrinter(LINE_FEED));
@@ -238,6 +251,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
 
   @Override
   public void abort() throws IOException {
+    if (cleanUpLocation != null) {
+      fs.delete(cleanUpLocation, true);
+      logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+          cleanUpLocation.toUri().getPath(), fs.getUri());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 6542ad4..a9a30e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -125,7 +125,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
 
     options.put("extension", ((TextFormatConfig)getConfig()).getExtensions().get(0));
 
-    RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator());
+    RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator(), writer.getStorageStrategy());
     recordWriter.init(options);
 
     return recordWriter;

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 4ee863a..a25699d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -34,6 +34,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
@@ -80,6 +81,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   public static final String DRILL_VERSION_PROPERTY = "drill.version";
   public static final String WRITER_VERSION_PROPERTY = "drill-writer.version";
 
+  private final StorageStrategy storageStrategy;
   private ParquetFileWriter parquetFileWriter;
   private MessageType schema;
   private Map<String, String> extraMetaData = new HashMap<>();
@@ -101,7 +103,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private BatchSchema batchSchema;
 
   private Configuration conf;
+  private FileSystem fs;
   private String location;
+  private List<Path> cleanUpLocations;
   private String prefix;
   private int index = 0;
   private OperatorContext oContext;
@@ -117,6 +121,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
     this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
     this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION));
+    this.storageStrategy = writer.getStorageStrategy() == null ? StorageStrategy.PERSISTENT : writer.getStorageStrategy();
+    this.cleanUpLocations = Lists.newArrayList();
   }
 
   @Override
@@ -126,6 +132,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
     conf = new Configuration();
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
+    fs = FileSystem.get(conf);
     blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
     pageSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE));
     dictionaryPageSize= Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_DICT_PAGE_SIZE));
@@ -363,7 +370,19 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     // we wait until there is at least one record before creating the parquet file
     if (parquetFileWriter == null) {
       Path path = new Path(location, prefix + "_" + index + ".parquet");
-      parquetFileWriter = new ParquetFileWriter(conf, schema, path);
+      // to ensure that our writer was the first to create output file, we create empty file first and fail if file exists
+      Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path);
+
+      // since parquet reader supports partitions, it means that several output files may be created
+      // if this writer was the one to create table folder, we store only folder and delete it with its content in case of abort
+      // if table location was created before, we store only files created by this writer and delete them in case of abort
+      addCleanUpLocation(fs, firstCreatedPath);
+
+      // since ParquetFileWriter will overwrite empty output file (append is not supported)
+      // we need to re-apply file permission
+      parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE);
+      storageStrategy.applyToFile(fs, path);
+
       parquetFileWriter.start();
     }
 
@@ -374,6 +393,24 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   @Override
   public void abort() throws IOException {
+    List<String> errors = Lists.newArrayList();
+    for (Path location : cleanUpLocations) {
+      try {
+        if (fs.exists(location)) {
+          fs.delete(location, false);
+          logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+              location.toUri().getPath(), fs.getUri());
+        }
+      } catch (IOException e) {
+        errors.add(location.toUri().getPath());
+        logger.error("Failed to delete location [{}] on file system [{}].",
+            location, fs.getUri(), e);
+      }
+    }
+    if (!errors.isEmpty()) {
+      throw new IOException(String.format("Failed to delete the following locations %s on file system [%s]" +
+          " during aborting writer", errors, fs.getUri()));
+    }
   }
 
   @Override
@@ -382,4 +419,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
     codecFactory.release();
   }
+
+  /**
+   * Adds passed location to the list of locations to be cleaned up in case of abort.
+   * Add locations if:
+   * <li>if no locations were added before</li>
+   * <li>if first location is a file</li>
+   *
+   * If first added location is a folder, we don't add other locations (which can be only files),
+   * since this writer was the one to create main folder where files are located,
+   * on abort we'll delete this folder with its content.
+   *
+   * If first location is a file, then we add other files, since this writer didn't create main folder
+   * and on abort we need to delete only created files but not the whole folder.
+   *
+   * @param fs file system where location is created
+   * @param location passed location
+   * @throws IOException in case of errors during check if passed location is a file
+   */
+  private void addCleanUpLocation(FileSystem fs, Path location) throws IOException {
+    if (cleanUpLocations.isEmpty() || fs.isFile(cleanUpLocations.get(0))) {
+      cleanUpLocations.add(location);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 716c56d..522c678 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,11 +21,11 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
@@ -61,6 +61,7 @@ public class ParquetWriter extends AbstractWriter {
           @JsonProperty("child") PhysicalOperator child,
           @JsonProperty("location") String location,
           @JsonProperty("partitionColumns") List<String> partitionColumns,
+          @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
           @JsonProperty("storage") StoragePluginConfig storageConfig,
           @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
 
@@ -69,6 +70,7 @@ public class ParquetWriter extends AbstractWriter {
     Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
     this.location = location;
     this.partitionColumns = partitionColumns;
+    setStorageStrategy(storageStrategy);
   }
 
   public ParquetWriter(PhysicalOperator child,
@@ -109,7 +111,9 @@ public class ParquetWriter extends AbstractWriter {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new ParquetWriter(child, location, partitionColumns, formatPlugin);
+    ParquetWriter writer = new ParquetWriter(child, location, partitionColumns, formatPlugin);
+    writer.setStorageStrategy(getStorageStrategy());
+    return writer;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 8a74b49..d65a3eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.StringOutputRecordWriter;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -37,6 +37,10 @@ import com.google.common.base.Joiner;
 public class DrillTextRecordWriter extends StringOutputRecordWriter {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class);
 
+  private final StorageStrategy storageStrategy;
+
+  private Path cleanUpLocation;
+
   private String location;
   private String prefix;
 
@@ -52,8 +56,9 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
   private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
   private StringBuilder currentRecord; // contains the current record separated by field delimiter
 
-  public DrillTextRecordWriter(BufferAllocator allocator) {
+  public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy) {
     super(allocator);
+    this.storageStrategy = storageStrategy == null ? StorageStrategy.PERSISTENT : storageStrategy;
   }
 
   @Override
@@ -79,7 +84,17 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
     // open a new file for writing data with new schema
     Path fileName = new Path(location, prefix + "_" + index + "." + extension);
     try {
+      // drill text writer does not support partitions, so only one file can be created
+      // and thus only one location should be deleted in case of abort
+      // to ensure that our writer was the first to create output file,
+      // we create empty output file first and fail if file exists
+      cleanUpLocation = storageStrategy.createFileAndApply(fs, fileName);
+
+      // since empty output file will be overwritten (some file systems may restrict append option)
+      // we need to re-apply file permission
       DataOutputStream fos = fs.create(fileName);
+      storageStrategy.applyToFile(fs, fileName);
+
       stream = new PrintStream(fos);
       logger.debug("Created file: {}", fileName);
     } catch (IOException ex) {
@@ -160,12 +175,10 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
 
   @Override
   public void abort() throws IOException {
-    cleanup();
-    try {
-      fs.delete(new Path(location), true);
-    } catch (IOException ex) {
-      logger.error("Abort failed. There could be leftover output files");
-      throw ex;
+    if (cleanUpLocation != null) {
+      fs.delete(cleanUpLocation, true);
+      logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+          cleanUpLocation.toUri().getPath(), fs.getUri());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 01e4be0..735ba2f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -203,7 +203,7 @@ drill.exec: {
   scan: {
     threadpool_size: 8,
     decode_threadpool_size: 1
-  }
+  },
   udf: {
     retry-attempts: 5,
     directory: {
@@ -227,7 +227,11 @@ drill.exec: {
       registry: ${drill.exec.udf.directory.base}"/registry",
       tmp: ${drill.exec.udf.directory.base}"/tmp"
     }
-  }
+  },
+  # Temporary table can be created ONLY in default temporary workspace.
+  # Full workspace name should be indicated (including schema and workspace separated by dot).
+  # Workspace MUST be file-based and writable. Workspace name is case-sensitive.
+  default_temporary_workspace: "dfs.tmp"
 }
 
 drill.jdbc: {

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 93916e9..fb84088 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -84,6 +84,7 @@ public class BaseTestQuery extends ExecTest {
     {
       put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
       put(ExecConstants.HTTP_ENABLE, "false");
+      put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, TEMP_SCHEMA);
     }
   };
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
index e9a38b0..acbf2e7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -177,7 +177,7 @@ public class TestDropTable extends PlanTestBase {
 
   @Test // DRILL-4673
   public void testDropTableIfExistsWhileTableExists() throws Exception {
-    final String existentTableName = "test_table";
+    final String existentTableName = "test_table_exists";
     test("use dfs_test.tmp");
 
     // successful dropping of existent table
@@ -192,7 +192,7 @@ public class TestDropTable extends PlanTestBase {
 
   @Test // DRILL-4673
   public void testDropTableIfExistsWhileTableDoesNotExist() throws Exception {
-    final String nonExistentTableName = "test_table";
+    final String nonExistentTableName = "test_table_not_exists";
     test("use dfs_test.tmp");
 
     // dropping of non existent table without error
@@ -200,7 +200,7 @@ public class TestDropTable extends PlanTestBase {
         .sqlQuery(String.format(DROP_TABLE_IF_EXISTS, nonExistentTableName))
         .unOrdered()
         .baselineColumns("ok", "summary")
-        .baselineValues(true, String.format("Table [%s] not found", nonExistentTableName))
+        .baselineValues(false, String.format("Table [%s] not found", nonExistentTableName))
         .go();
   }
 
@@ -216,7 +216,7 @@ public class TestDropTable extends PlanTestBase {
           .sqlQuery(String.format(DROP_TABLE_IF_EXISTS, viewName))
           .unOrdered()
           .baselineColumns("ok", "summary")
-          .baselineValues(true, String.format("Table [%s] not found", viewName))
+          .baselineValues(false, String.format("Table [%s] not found", viewName))
           .go();
     } finally {
       test(String.format(DROP_VIEW_IF_EXISTS, viewName));

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
new file mode 100644
index 0000000..f5d45b0
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.integration.junit4.JMockit;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.util.TestUtilities;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.UUID;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(JMockit.class)
+public class TemporaryTablesAutomaticDropTest extends BaseTestQuery {
+
+  private static final String session_id = "sessionId";
+
+  @Before
+  public void init() throws Exception {
+    new MockUp<UUID>() {
+      @Mock
+      public UUID randomUUID() {
+        return UUID.nameUUIDFromBytes(session_id.getBytes());
+      }
+    };
+    updateTestCluster(1, DrillConfig.create(cloneDefaultTestConfigProperties()));
+  }
+
+  @Test
+  public void testAutomaticDropWhenClientIsClosed() throws Exception {
+    File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("client_closed",
+            getDfsTestTmpSchemaLocation());
+    updateClient("new_client");
+    assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists());
+  }
+
+  @Test
+  public void testAutomaticDropWhenDrillbitIsClosed() throws Exception {
+    File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("drillbit_closed",
+            getDfsTestTmpSchemaLocation());
+    bits[0].close();
+    assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists());
+  }
+
+  @Test
+  public void testAutomaticDropOfSeveralSessionTemporaryLocations() throws Exception {
+    File firstSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("first_location",
+            getDfsTestTmpSchemaLocation());
+    StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+    String tempDir = TestUtilities.createTempDir();
+    try {
+      TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, tempDir);
+      File secondSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("second_location", tempDir);
+      updateClient("new_client");
+      assertFalse("First session temporary location should be absent", firstSessionTemporaryLocation.exists());
+      assertFalse("Second session temporary location should be absent", secondSessionTemporaryLocation.exists());
+    } finally {
+      TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, getDfsTestTmpSchemaLocation());
+    }
+  }
+
+  private File createAndCheckSessionTemporaryLocation(String suffix, String schemaLocation) throws Exception {
+    String temporaryTableName = "temporary_table_automatic_drop_" + suffix;
+    test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+    File sessionTemporaryLocation = new File(schemaLocation,
+            UUID.nameUUIDFromBytes(session_id.getBytes()).toString());
+    assertTrue("Session temporary location should exist", sessionTemporaryLocation.exists());
+    return sessionTemporaryLocation;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
index 43d8d57..5bf55af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -143,7 +143,7 @@ public class TestBaseViewSupport extends BaseTestQuery {
           .sqlQuery(String.format("DROP VIEW IF EXISTS %s", viewFullName))
           .unOrdered()
           .baselineColumns("ok", "summary")
-          .baselineValues(true, String.format("View [%s] not found in schema [%s].", viewName, finalSchema))
+          .baselineValues(false, String.format("View [%s] not found in schema [%s].", viewName, finalSchema))
           .go();
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
new file mode 100644
index 0000000..93c8cad
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.sql;
+
+import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.integration.junit4.JMockit;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.exec.util.TestUtilities;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(JMockit.class)
+public class TestCTTAS extends BaseTestQuery {
+
+  private static final UUID session_id = UUID.nameUUIDFromBytes("sessionId".getBytes());
+  private static final String test_schema = "dfs_test";
+  private static final String temp2_wk = "tmp2";
+  private static final String temp2_schema = String.format("%s.%s", test_schema, temp2_wk);
+
+  private static FileSystem fs;
+  private static FsPermission expectedFolderPermission;
+  private static FsPermission expectedFilePermission;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    MockUp<UUID> uuidMockUp = mockRandomUUID(session_id);
+    updateTestCluster(1, DrillConfig.create(cloneDefaultTestConfigProperties()));
+    uuidMockUp.tearDown();
+
+    StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+    FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(test_schema).getConfig();
+    pluginConfig.workspaces.put(temp2_wk, new WorkspaceConfig(TestUtilities.createTempDir(), true, null));
+    pluginRegistry.createOrUpdate(test_schema, pluginConfig, true);
+
+    fs = FileSystem.get(new Configuration());
+    expectedFolderPermission = new FsPermission(StorageStrategy.TEMPORARY.getFolderPermission());
+    expectedFilePermission = new FsPermission(StorageStrategy.TEMPORARY.getFilePermission());
+  }
+
+  private static MockUp<UUID> mockRandomUUID(final UUID uuid) {
+    return new MockUp<UUID>() {
+      @Mock
+      public UUID randomUUID() {
+        return uuid;
+      }
+    };
+  }
+
+  @Test
+  public void testSyntax() throws Exception {
+    test("create TEMPORARY table temporary_keyword as select 1 from (values(1))");
+    test("create TEMPORARY table temporary_keyword_with_wk as select 1 from (values(1))", TEMP_SCHEMA);
+  }
+
+  @Test
+  public void testCreateTableWithDifferentStorageFormats() throws Exception {
+    List<String> storageFormats = Lists.newArrayList("parquet", "json", "csvh");
+
+    try {
+      for (String storageFormat : storageFormats) {
+        String temporaryTableName = "temp_" + storageFormat;
+        mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes()));
+        test("alter session set `store.format`='%s'", storageFormat);
+        test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+        checkPermission(temporaryTableName);
+
+        testBuilder()
+            .sqlQuery("select * from %s", temporaryTableName)
+            .unOrdered()
+            .baselineColumns("c1")
+            .baselineValues("A")
+            .go();
+
+        testBuilder()
+            .sqlQuery("select * from %s", temporaryTableName)
+            .unOrdered()
+            .sqlBaselineQuery("select * from %s.%s", TEMP_SCHEMA, temporaryTableName)
+            .go();
+      }
+    } finally {
+      test("alter session reset `store.format`");
+    }
+  }
+
+  @Test
+  public void testTemporaryTablesCaseInsensitivity() throws Exception {
+    String temporaryTableName = "tEmP_InSeNSiTiVe";
+    List<String> temporaryTableNames = Lists.newArrayList(
+        temporaryTableName,
+        temporaryTableName.toLowerCase(),
+        temporaryTableName.toUpperCase());
+
+    test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+    for (String tableName : temporaryTableNames) {
+      testBuilder()
+          .sqlQuery("select * from %s", tableName)
+          .unOrdered()
+          .baselineColumns("c1")
+          .baselineValues("A")
+          .go();
+    }
+  }
+
+  @Test
+  public void testPartitionByWithTemporaryTables() throws Exception {
+    String temporaryTableName = "temporary_table_with_partitions";
+    mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes()));
+    test("create TEMPORARY table %s partition by (c1) as select * from (" +
+        "select 'A' as c1 from (values(1)) union all select 'B' as c1 from (values(1))) t", temporaryTableName);
+    checkPermission(temporaryTableName);
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreationOutsideOfDefaultTemporaryWorkspace() throws Exception {
+    try {
+      String temporaryTableName = "temporary_table_outside_of_default_workspace";
+      test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", temp2_schema, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: Temporary tables are not allowed to be created outside of default temporary workspace [%s].",
+          TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateWhenTemporaryTableExistsWithoutSchema() throws Exception {
+    String temporaryTableName = "temporary_table_exists_without_schema";
+    try {
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+         "VALIDATION ERROR: A table or view with given name [%s]" +
+             " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateWhenTemporaryTableExistsCaseInsensitive() throws Exception {
+    String temporaryTableName = "temporary_table_exists_without_schema";
+    try {
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName.toUpperCase());
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A table or view with given name [%s]" +
+              " already exists in schema [%s]", temporaryTableName.toUpperCase(), TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateWhenTemporaryTableExistsWithSchema() throws Exception {
+    String temporaryTableName = "temporary_table_exists_with_schema";
+    try {
+      test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+      test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A table or view with given name [%s]" +
+              " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateWhenPersistentTableExists() throws Exception {
+    String persistentTableName = "persistent_table_exists";
+    try {
+      test("create table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, persistentTableName);
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", persistentTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A table or view with given name [%s]" +
+              " already exists in schema [%s]", persistentTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateWhenViewExists() throws Exception {
+    String viewName = "view_exists";
+    try {
+      test("create view %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, viewName);
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", viewName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A table or view with given name [%s]" +
+              " already exists in schema [%s]", viewName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreatePersistentTableWhenTemporaryTableExists() throws Exception {
+    String temporaryTableName = "temporary_table_exists_before_persistent";
+    try {
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+      test("create table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A table or view with given name [%s]" +
+              " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateViewWhenTemporaryTableExists() throws Exception {
+    String temporaryTableName = "temporary_table_exists_before_view";
+    try {
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+      test("create view %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A non-view table with given name [%s] already exists in schema [%s]",
+          temporaryTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test
+  public void testTemporaryAndPersistentTablesPriority() throws Exception {
+    String name = "temporary_and_persistent_table";
+    test("use %s", temp2_schema);
+    test("create TEMPORARY table %s as select 'temporary_table' as c1 from (values(1))", name);
+    test("create table %s as select 'persistent_table' as c1 from (values(1))", name);
+
+    testBuilder()
+        .sqlQuery("select * from %s", name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("temporary_table")
+        .go();
+
+    testBuilder()
+        .sqlQuery("select * from %s.%s", temp2_schema, name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("persistent_table")
+        .go();
+
+    test("drop table %s", name);
+
+    testBuilder()
+        .sqlQuery("select * from %s", name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("persistent_table")
+        .go();
+  }
+
+  @Test
+  public void testTemporaryTableAndViewPriority() throws Exception {
+    String name = "temporary_table_and_view";
+    test("use %s", temp2_schema);
+    test("create TEMPORARY table %s as select 'temporary_table' as c1 from (values(1))", name);
+    test("create view %s as select 'view' as c1 from (values(1))", name);
+
+    testBuilder()
+        .sqlQuery("select * from %s", name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("temporary_table")
+        .go();
+
+    testBuilder()
+        .sqlQuery("select * from %s.%s", temp2_schema, name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("view")
+        .go();
+
+    test("drop table %s", name);
+
+    testBuilder()
+        .sqlQuery("select * from %s", name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("view")
+        .go();
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testTemporaryTablesInViewDefinitions() throws Exception {
+    String temporaryTableName = "temporary_table_for_view_definition";
+    test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+    try {
+      test("create view %s.view_with_temp_table as select * from %s", TEMP_SCHEMA, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: Temporary tables usage is disallowed. Used temporary table name: [%s]", temporaryTableName)));
+      throw e;
+    }
+  }
+
+  @Test
+  public void testManualDropWithoutSchema() throws Exception {
+    String temporaryTableName = "temporary_table_to_drop_without_schema";
+    test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+    testBuilder()
+        .sqlQuery("drop table %s", temporaryTableName)
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(true, String.format("Temporary table [%s] dropped", temporaryTableName))
+        .go();
+  }
+
+  @Test
+  public void testManualDropWithSchema() throws Exception {
+    String temporaryTableName = "temporary_table_to_drop_with_schema";
+    test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+
+    testBuilder()
+        .sqlQuery("drop table %s.%s", TEMP_SCHEMA, temporaryTableName)
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(true, String.format("Temporary table [%s] dropped", temporaryTableName))
+        .go();
+  }
+
+  @Test
+  public void testDropTemporaryTableAsViewWithoutException() throws Exception {
+    String temporaryTableName = "temporary_table_to_drop_like_view_without_exception";
+    test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+    testBuilder()
+        .sqlQuery("drop view if exists %s.%s", TEMP_SCHEMA, temporaryTableName)
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(false, String.format("View [%s] not found in schema [%s].",
+            temporaryTableName, TEMP_SCHEMA))
+        .go();
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testDropTemporaryTableAsViewWithException() throws Exception {
+    String temporaryTableName = "temporary_table_to_drop_like_view_with_exception";
+    test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+    try {
+      test("drop view %s.%s", TEMP_SCHEMA, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+              "VALIDATION ERROR: Unknown view [%s] in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  private void checkPermission(String tmpTableName) throws IOException {
+    File[] files = findTemporaryTableLocation(tmpTableName);
+    assertEquals("Only one directory should match temporary table name " + tmpTableName, 1, files.length);
+    Path tmpTablePath = new Path(files[0].toURI().getPath());
+    assertEquals("Directory permission should match",
+        expectedFolderPermission, fs.getFileStatus(tmpTablePath).getPermission());
+    RemoteIterator<LocatedFileStatus> fileIterator = fs.listFiles(tmpTablePath, false);
+    while (fileIterator.hasNext()) {
+      assertEquals("File permission should match", expectedFilePermission, fileIterator.next().getPermission());
+    }
+  }
+
+  private File[] findTemporaryTableLocation(String tableName) throws IOException {
+    File sessionTempLocation = new File(getDfsTestTmpSchemaLocation(), session_id.toString());
+    Path sessionTempLocationPath = new Path(sessionTempLocation.toURI().getPath());
+    assertTrue("Session temporary location must exist", fs.exists(sessionTempLocationPath));
+    assertEquals("Session temporary location permission should match",
+        expectedFolderPermission, fs.getFileStatus(sessionTempLocationPath).getPermission());
+    final String tableUUID =  UUID.nameUUIDFromBytes(tableName.getBytes()).toString();
+    return sessionTempLocation.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(File path) {
+        return path.isDirectory() && path.getName().equals(tableUUID);
+      }
+    });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
new file mode 100644
index 0000000..6a377ec
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class StorageStrategyTest {
+
+  private static final Configuration configuration = new Configuration();
+  private static final FsPermission full_permission = new FsPermission("777");
+  private static final StorageStrategy persistent_strategy = new StorageStrategy("775", "644", false);
+  private static final StorageStrategy temporary_strategy = new StorageStrategy("700", "600", true);
+  private FileSystem fs;
+
+  @Before
+  public void setup() throws Exception {
+    initFileSystem();
+  }
+
+  @Test
+  public void testPermissionAndDeleteOnExitFalseForFileWithParent() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 2, true);
+    Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+    Path createdParentPath = persistent_strategy.createFileAndApply(fs, file);
+
+    assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+    checkPathAndPermission(initialPath, file, true, 2, persistent_strategy);
+    checkDeleteOnExit(firstCreatedParentPath, true);
+  }
+
+  @Test
+  public void testPermissionAndDeleteOnExitTrueForFileWithParent() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 2, true);
+    Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+    Path createdParentPath = temporary_strategy.createFileAndApply(fs, file);
+
+    assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+    checkPathAndPermission(initialPath, file, true, 2, temporary_strategy);
+    checkDeleteOnExit(firstCreatedParentPath, false);
+  }
+
+  @Test
+  public void testPermissionAndDeleteOnExitFalseForFileOnly() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 0, true);
+
+    Path createdFile = persistent_strategy.createFileAndApply(fs, file);
+
+    assertEquals("Path should match", file, createdFile);
+    checkPathAndPermission(initialPath, file, true, 0, persistent_strategy);
+    checkDeleteOnExit(file, true);
+  }
+
+  @Test
+  public void testPermissionAndDeleteOnExitTrueForFileOnly() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 0, true);
+
+    Path createdFile = temporary_strategy.createFileAndApply(fs, file);
+
+    assertEquals("Path should match", file, createdFile);
+    checkPathAndPermission(initialPath, file, true, 0, temporary_strategy);
+    checkDeleteOnExit(file, false);
+  }
+
+  @Test(expected = IOException.class)
+  public void testFailureOnExistentFile() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 0, true);
+    fs.createNewFile(file);
+    assertTrue("File should exist", fs.exists(file));
+    try {
+      persistent_strategy.createFileAndApply(fs, file);
+    } catch (IOException e) {
+      assertEquals("Error message should match", String.format("File [%s] already exists on file system [%s].",
+          file.toUri().getPath(), fs.getUri()), e.getMessage());
+      throw e;
+    }
+  }
+
+  @Test
+  public void testCreatePathAndDeleteOnExitFalse() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path resultPath = addNLevelsAndFile(initialPath, 2, false);
+    Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+    Path createdParentPath = persistent_strategy.createPathAndApply(fs, resultPath);
+
+    assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+    checkPathAndPermission(initialPath, resultPath, false, 2, persistent_strategy);
+    checkDeleteOnExit(firstCreatedParentPath, true);
+  }
+
+  @Test
+  public void testCreatePathAndDeleteOnExitTrue() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path resultPath = addNLevelsAndFile(initialPath, 2, false);
+    Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+    Path createdParentPath = temporary_strategy.createPathAndApply(fs, resultPath);
+
+    assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+    checkPathAndPermission(initialPath, resultPath, false, 2, temporary_strategy);
+    checkDeleteOnExit(firstCreatedParentPath, false);
+  }
+
+  @Test
+  public void testCreateNoPath() throws Exception {
+    Path path = prepareStorageDirectory();
+
+    Path createdParentPath = temporary_strategy.createPathAndApply(fs, path);
+
+    assertNull("Path should be null", createdParentPath);
+    assertEquals("Permission should match", full_permission, fs.getFileStatus(path).getPermission());
+  }
+
+  @Test
+  public void testStrategyForExistingFile() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 0, true);
+    fs.createNewFile(file);
+    fs.setPermission(file, full_permission);
+
+    assertTrue("File should exist", fs.exists(file));
+    assertEquals("Permission should match", full_permission, fs.getFileStatus(file).getPermission());
+
+    temporary_strategy.applyToFile(fs, file);
+
+    assertEquals("Permission should match", new FsPermission(temporary_strategy.getFilePermission()),
+        fs.getFileStatus(file).getPermission());
+    checkDeleteOnExit(file, false);
+  }
+
+  private Path prepareStorageDirectory() throws IOException {
+    File storageDirectory = Files.createTempDir();
+    storageDirectory.deleteOnExit();
+    Path path = new Path(storageDirectory.toURI().getPath());
+    fs.setPermission(path, full_permission);
+    return path;
+  }
+
+  private void initFileSystem() throws IOException {
+    if (fs != null) {
+      try {
+        fs.close();
+      } catch (Exception e) {
+        // do nothing
+      }
+    }
+    fs = FileSystem.get(configuration);
+  }
+
+  private Path addNLevelsAndFile(Path initialPath, int levels, boolean addFile) {
+    Path resultPath = initialPath;
+    for (int i = 1; i <= levels; i++) {
+      resultPath = new Path(resultPath, "level" + i);
+    }
+    if (addFile) {
+      resultPath = new Path(resultPath, "test_file.txt");
+    }
+    return resultPath;
+  }
+
+  private void checkPathAndPermission(Path initialPath,
+                                      Path resultPath,
+                                      boolean isFile,
+                                      int levels,
+                                      StorageStrategy storageStrategy) throws IOException {
+
+    assertEquals("Path type should match", isFile, fs.isFile(resultPath));
+    assertEquals("Permission should match", full_permission, fs.getFileStatus(initialPath).getPermission());
+
+    if (isFile) {
+      assertEquals("Permission should match", new FsPermission(storageStrategy.getFilePermission()),
+          fs.getFileStatus(resultPath).getPermission());
+    }
+    Path startingPath = initialPath;
+    FsPermission folderPermission = new FsPermission(storageStrategy.getFolderPermission());
+    for (int i = 1; i <= levels; i++) {
+      startingPath = new Path(startingPath, "level" + i);
+      assertEquals("Permission should match", folderPermission, fs.getFileStatus(startingPath).getPermission());
+    }
+  }
+
+  private void checkDeleteOnExit(Path path, boolean isPresent) throws IOException {
+    assertTrue("Path should be present", fs.exists(path));
+    // close and open file system to check for path presence
+    initFileSystem();
+    assertEquals("Path existence flag should match", isPresent, fs.exists(path));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index 7b977e2..35ca26b 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -9,7 +9,7 @@
           writable: false
         },
         "tmp" : {
-          location: "/tmp/drilltest",
+          location: "/tmp",
           writable: true
         }
       },

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index fbacd23..cd97ab7 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -176,6 +176,13 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   }
 
   /**
+   * Closes all resources connected with current session.
+   * By default has no implementation.
+   */
+  public void closeSession() {
+  }
+
+  /**
    * Connection consumer wants to close connection. Initiate connection close
    * and complete. This is a blocking call that ensures that the connection is
    * closed before returning. As part of this call, the channel close handler

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index c360e51..cdb9c07 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -164,7 +164,11 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
       }
 
       final ChannelClosedException ex = future.cause() != null ? new ChannelClosedException(msg, future.cause()) : new ChannelClosedException(msg);
-      clientConnection.channelClosed(ex);
+      try {
+        clientConnection.closeSession();
+      } finally {
+        clientConnection.channelClosed(ex);
+      }
     }
 
   }


[5/5] drill git commit: DRILL-5164: Equi-join query results in CompileException when inputs have large number of columns.

Posted by jn...@apache.org.
DRILL-5164: Equi-join query results in CompileException when inputs have large number of columns.

close apache/drill#711


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2af709f4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2af709f4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2af709f4

Branch: refs/heads/master
Commit: 2af709f43d01f341b2a52c6473ea49d6761fdc61
Parents: 8217697
Author: Serhii-Harnyk <se...@gmail.com>
Authored: Tue Dec 27 16:20:37 2016 +0000
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Jan 23 17:08:58 2017 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/join/HashJoinBatch.java  |  4 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |  2 +
 .../physical/impl/join/NestedLoopJoinBatch.java |  4 +-
 .../exec/compile/TestLargeFileCompilation.java  | 94 +++++++++++++++++---
 4 files changed, 87 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 23741b0..f1f81fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -439,7 +439,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
             .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
             .arg(outIndex)
             .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
-
+        g.rotateBlock();
         fieldId++;
       }
     }
@@ -475,7 +475,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
         final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
 
         g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV));
-
+        g.rotateBlock();
         fieldId++;
         outputFieldId++;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index a9bb479..c351517 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -345,6 +345,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
           .arg(copyLeftMapping.getValueReadIndex())
           .arg(copyLeftMapping.getValueWriteIndex())
           .arg(vvIn));
+        cg.rotateBlock();
         ++vectorId;
       }
     }
@@ -373,6 +374,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
           .arg(copyRightMappping.getValueReadIndex())
           .arg(copyRightMappping.getValueWriteIndex())
           .arg(vvIn));
+        cg.rotateBlock();
         ++vectorId;
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 2e92c8d..bdd9f0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -247,7 +247,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
       JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(fieldType, false, outputFieldId));
 
       nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV));
-
+      nLJClassGenerator.rotateBlock();
       fieldId++;
       outputFieldId++;
     }
@@ -270,7 +270,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
           .arg(recordIndexWithinBatch)
           .arg(outIndex)
           .arg(inVV.component(batchIndex)));
-
+      nLJClassGenerator.rotateBlock();
       fieldId++;
       outputFieldId++;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 35bd4c9..8416d73 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -39,27 +39,33 @@ public class TestLargeFileCompilation extends BaseTestQuery {
 
   private static final String LARGE_QUERY_SELECT_LIST;
 
+  private static final String QUERY_WITH_JOIN;
+
+  private static final String LARGE_TABLE_WRITER;
+
   private static final int ITERATION_COUNT = Integer.valueOf(System.getProperty("TestLargeFileCompilation.iteration", "1"));
 
-  private static final int NUM_PROJECT_COULMNS = 2000;
+  private static final int NUM_PROJECT_COLUMNS = 2000;
+
+  private static final int NUM_ORDERBY_COLUMNS = 500;
 
-  private static final int NUM_ORDERBY_COULMNS = 500;
+  private static final int NUM_GROUPBY_COLUMNS = 225;
 
-  private static final int NUM_GROUPBY_COULMNS = 225;
+  private static final int NUM_FILTER_COLUMNS = 150;
 
-  private static final int NUM_FILTER_COULMNS = 150;
+  private static final int NUM_JOIN_TABLE_COLUMNS = 500;
 
   static {
     StringBuilder sb = new StringBuilder("select\n\t");
-    for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) {
+    for (int i = 0; i < NUM_GROUPBY_COLUMNS; i++) {
       sb.append("c").append(i).append(", ");
     }
     sb.append("full_name\nfrom (select\n\t");
-    for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) {
+    for (int i = 0; i < NUM_GROUPBY_COLUMNS; i++) {
       sb.append("employee_id+").append(i).append(" as c").append(i).append(", ");
     }
     sb.append("full_name\nfrom cp.`employee.json`)\ngroup by\n\t");
-    for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) {
+    for (int i = 0; i < NUM_GROUPBY_COLUMNS; i++) {
       sb.append("c").append(i).append(", ");
     }
     LARGE_QUERY_GROUP_BY = sb.append("full_name").toString();
@@ -67,7 +73,7 @@ public class TestLargeFileCompilation extends BaseTestQuery {
 
   static {
     StringBuilder sb = new StringBuilder("select\n\t");
-    for (int i = 0; i < NUM_PROJECT_COULMNS; i++) {
+    for (int i = 0; i < NUM_PROJECT_COLUMNS; i++) {
       sb.append("employee_id+").append(i).append(" as col").append(i).append(", ");
     }
     sb.append("full_name\nfrom cp.`employee.json`\n\n\t");
@@ -76,11 +82,11 @@ public class TestLargeFileCompilation extends BaseTestQuery {
 
   static {
     StringBuilder sb = new StringBuilder("select\n\t");
-    for (int i = 0; i < NUM_PROJECT_COULMNS; i++) {
+    for (int i = 0; i < NUM_PROJECT_COLUMNS; i++) {
       sb.append("employee_id+").append(i).append(" as col").append(i).append(", ");
     }
     sb.append("full_name\nfrom cp.`employee.json`\norder by\n\t");
-    for (int i = 0; i < NUM_ORDERBY_COULMNS; i++) {
+    for (int i = 0; i < NUM_ORDERBY_COLUMNS; i++) {
       sb.append(" col").append(i).append(", ");
     }
     LARGE_QUERY_ORDER_BY = sb.append("full_name").toString();
@@ -91,18 +97,24 @@ public class TestLargeFileCompilation extends BaseTestQuery {
     StringBuilder sb = new StringBuilder("select *\n")
       .append("from cp.`employee.json`\n")
       .append("where");
-    for (int i = 0; i < NUM_FILTER_COULMNS; i++) {
+    for (int i = 0; i < NUM_FILTER_COLUMNS; i++) {
       sb.append(" employee_id+").append(i).append(" < employee_id ").append(i%2==0?"OR":"AND");
     }
     LARGE_QUERY_FILTER = sb.append(" true") .toString();
   }
 
   static {
+    LARGE_QUERY_WRITER = createTableWithColsCount(NUM_PROJECT_COLUMNS);
+    LARGE_TABLE_WRITER = createTableWithColsCount(NUM_JOIN_TABLE_COLUMNS);
+    QUERY_WITH_JOIN = "select * from %1$s t1, %1$s t2 where t1.col1 = t2.col1";
+  }
+
+  private static String createTableWithColsCount(int columnsCount) {
     StringBuilder sb = new StringBuilder("create table %s as (select \n");
-    for (int i = 0; i < NUM_PROJECT_COULMNS; i++) {
+    for (int i = 0; i < columnsCount; i++) {
       sb.append("employee_id+").append(i).append(" as col").append(i).append(", ");
     }
-    LARGE_QUERY_WRITER = sb.append("full_name\nfrom cp.`employee.json` limit 1)").toString();
+    return sb.append("full_name\nfrom cp.`employee.json` limit 1)").toString();
   }
 
   @Test
@@ -150,4 +162,60 @@ public class TestLargeFileCompilation extends BaseTestQuery {
     testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
     testNoResult(ITERATION_COUNT, LARGE_QUERY_SELECT_LIST);
   }
+
+  @Test
+  public void testHashJoin() throws Exception {
+    String tableName = "wide_table_hash_join";
+    try {
+      testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("alter session set `planner.enable_mergejoin` = false");
+      testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
+      testNoResult("use dfs_test.tmp");
+      testNoResult(LARGE_TABLE_WRITER, tableName);
+      testNoResult(QUERY_WITH_JOIN, tableName);
+    } finally {
+      testNoResult("alter session reset `planner.enable_mergejoin`");
+      testNoResult("alter session reset `planner.enable_nestedloopjoin`");
+      testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("drop table if exists %s", tableName);
+    }
+  }
+
+  @Test
+  public void testMergeJoin() throws Exception {
+    String tableName = "wide_table_merge_join";
+    try {
+      testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("alter session set `planner.enable_hashjoin` = false");
+      testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
+      testNoResult("use dfs_test.tmp");
+      testNoResult(LARGE_TABLE_WRITER, tableName);
+      testNoResult(QUERY_WITH_JOIN, tableName);
+    } finally {
+      testNoResult("alter session reset `planner.enable_hashjoin`");
+      testNoResult("alter session reset `planner.enable_nestedloopjoin`");
+      testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("drop table if exists %s", tableName);
+    }
+  }
+
+  @Test
+  public void testNestedLoopJoin() throws Exception {
+    String tableName = "wide_table_loop_join";
+    try {
+      testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("alter session set `planner.enable_nljoin_for_scalar_only` = false");
+      testNoResult("alter session set `planner.enable_hashjoin` = false");
+      testNoResult("alter session set `planner.enable_mergejoin` = false");
+      testNoResult("use dfs_test.tmp");
+      testNoResult(LARGE_TABLE_WRITER, tableName);
+      testNoResult(QUERY_WITH_JOIN, tableName);
+    } finally {
+      testNoResult("alter session reset `planner.enable_nljoin_for_scalar_only`");
+      testNoResult("alter session reset `planner.enable_hashjoin`");
+      testNoResult("alter session reset `planner.enable_mergejoin`");
+      testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("drop table if exists %s", tableName);
+    }
+  }
 }


[2/5] drill git commit: DRILL-4956: Temporary tables support

Posted by jn...@apache.org.
DRILL-4956: Temporary tables support

close apache/drill#666


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bb29f19f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bb29f19f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bb29f19f

Branch: refs/heads/master
Commit: bb29f19ff8807fd07cdaa9e7110c1a003b3da15b
Parents: 8a4d7a9
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Thu Nov 3 16:55:38 2016 +0000
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Jan 23 17:08:06 2017 -0800

----------------------------------------------------------------------
 .../src/resources/drill-override-example.conf   |   8 +-
 .../src/main/codegen/includes/parserImpls.ftl   |   9 +-
 .../org/apache/drill/exec/ExecConstants.java    |   7 +-
 .../exec/physical/base/AbstractWriter.java      |  17 +-
 .../exec/physical/impl/WriterRecordBatch.java   |  22 +-
 .../logical/FileSystemCreateTableEntry.java     |  18 +-
 .../drill/exec/planner/sql/DrillSqlWorker.java  |   9 +-
 .../drill/exec/planner/sql/SchemaUtilites.java  |  42 +-
 .../drill/exec/planner/sql/SqlConverter.java    | 141 +++++--
 .../sql/handlers/CreateTableHandler.java        | 111 ++++-
 .../planner/sql/handlers/DropTableHandler.java  |  65 ++-
 .../planner/sql/handlers/SqlHandlerUtil.java    |  49 ++-
 .../exec/planner/sql/handlers/ViewHandler.java  |  63 +--
 .../sql/parser/CompoundIdentifierConverter.java |   4 +-
 .../exec/planner/sql/parser/SqlCreateTable.java |  28 +-
 .../apache/drill/exec/rpc/user/UserServer.java  |  13 +-
 .../apache/drill/exec/rpc/user/UserSession.java | 170 +++++++-
 .../org/apache/drill/exec/server/Drillbit.java  |  23 +-
 .../apache/drill/exec/store/AbstractSchema.java |  21 +-
 .../drill/exec/store/SchemaTreeProvider.java    |  32 +-
 .../drill/exec/store/StorageStrategy.java       | 194 +++++++++
 .../drill/exec/store/SubSchemaWrapper.java      |   6 +-
 .../drill/exec/store/dfs/FileSelection.java     |   5 +-
 .../exec/store/dfs/FileSystemSchemaFactory.java |  10 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |  10 +-
 .../drill/exec/store/dfs/easy/EasyWriter.java   |  10 +-
 .../exec/store/easy/json/JSONFormatPlugin.java  |   4 +-
 .../exec/store/easy/json/JsonRecordWriter.java  |  24 +-
 .../exec/store/easy/text/TextFormatPlugin.java  |   4 +-
 .../exec/store/parquet/ParquetRecordWriter.java |  64 ++-
 .../drill/exec/store/parquet/ParquetWriter.java |  10 +-
 .../exec/store/text/DrillTextRecordWriter.java  |  31 +-
 .../src/main/resources/drill-module.conf        |   8 +-
 .../java/org/apache/drill/BaseTestQuery.java    |   3 +-
 .../java/org/apache/drill/TestDropTable.java    |  10 +-
 .../user/TemporaryTablesAutomaticDropTest.java  |  95 +++++
 .../drill/exec/sql/TestBaseViewSupport.java     |   4 +-
 .../org/apache/drill/exec/sql/TestCTTAS.java    | 422 +++++++++++++++++++
 .../drill/exec/store/StorageStrategyTest.java   | 222 ++++++++++
 .../resources/bootstrap-storage-plugins.json    |   2 +-
 .../apache/drill/exec/rpc/RemoteConnection.java |   9 +-
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |   8 +-
 42 files changed, 1797 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index f9d27b3..3baac5e 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -184,7 +184,13 @@ drill.exec: {
       # Set this property if custom absolute root should be used for remote directories
       root: "/app/drill"
     }
-  }
+  },
+  # Settings for Temporary Tables.
+  # See https://gist.github.com/arina-ielchiieva/50158175867a18eee964b5ba36455fbf#file-temporarytablessupport-md.
+  # Temporary table can be created ONLY in default temporary workspace.
+  # Full workspace name should be indicated (including schema and workspace separated by dot).
+  # Workspace MUST be file-based and writable. Workspace name is case-sensitive.
+  default_temporary_workspace: "dfs.tmp"
 }
 
 # Below SSL parameters need to be set for custom transport layer settings.

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index 0017446..d9ceed9 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -214,8 +214,8 @@ SqlNode SqlDropView() :
 }
 
 /**
- * Parses a CTAS statement.
- * CREATE TABLE tblname [ (field1, field2, ...) ] AS select_statement.
+ * Parses a CTAS or CTTAS statement.
+ * CREATE [TEMPORARY] TABLE tblname [ (field1, field2, ...) ] AS select_statement.
  */
 SqlNode SqlCreateTable() :
 {
@@ -224,12 +224,14 @@ SqlNode SqlCreateTable() :
     SqlNodeList fieldList;
     SqlNodeList partitionFieldList;
     SqlNode query;
+    boolean isTemporary = false;
 }
 {
     {
         partitionFieldList = SqlNodeList.EMPTY;
     }
     <CREATE> { pos = getPos(); }
+    ( <TEMPORARY> { isTemporary = true; } )?
     <TABLE>
     tblName = CompoundIdentifier()
     fieldList = ParseOptionalFieldList("Table")
@@ -239,7 +241,8 @@ SqlNode SqlCreateTable() :
     <AS>
     query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
     {
-        return new SqlCreateTable(pos, tblName, fieldList, partitionFieldList, query);
+        return new SqlCreateTable(pos,tblName, fieldList, partitionFieldList, query,
+                                    SqlLiteral.createBoolean(isTemporary, getPos()));
     }
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 740eb4b..e8cc75c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -128,6 +128,11 @@ public interface ExecConstants {
    */
   String DRILL_TMP_DIR = "drill.tmp-dir";
 
+  /**
+   * Temporary tables can be created ONLY in default temporary workspace.
+   */
+  String DEFAULT_TEMPORARY_WORKSPACE = "drill.exec.default_temporary_workspace";
+
   String OUTPUT_FORMAT_OPTION = "store.format";
   OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
   String PARQUET_BLOCK_SIZE = "store.parquet.block-size";

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
index af23d5f..6ba570b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,12 @@
  */
 package org.apache.drill.exec.physical.base;
 
-public abstract class AbstractWriter extends AbstractSingle implements Writer{
+import org.apache.drill.exec.store.StorageStrategy;
+
+public abstract class AbstractWriter extends AbstractSingle implements Writer {
+
+  /** Storage strategy is used during table folder and files creation*/
+  private StorageStrategy storageStrategy;
 
   public AbstractWriter(PhysicalOperator child) {
     super(child);
@@ -27,4 +32,12 @@ public abstract class AbstractWriter extends AbstractSingle implements Writer{
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
     return physicalVisitor.visitWriter(this, value);
   }
+
+  public void setStorageStrategy(StorageStrategy storageStrategy) {
+    this.storageStrategy = storageStrategy;
+  }
+
+  public StorageStrategy getStorageStrategy() {
+    return storageStrategy;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index e6c946c..939832b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -174,13 +174,25 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     schema = container.getSchema();
   }
 
+  /** Clean up needs to be performed before closing writer. Partially written data will be removed. */
   private void closeWriter() {
-    if (recordWriter != null) {
+    if (recordWriter == null) {
+      return;
+    }
+
+    try {
+      recordWriter.cleanup();
+    } catch(IOException ex) {
+      context.fail(ex);
+    } finally {
       try {
-        recordWriter.cleanup();
+        if (!processed) {
+          recordWriter.abort();
+        }
+      } catch (IOException e) {
+        logger.error("Abort failed. There could be leftover output files.", e);
+      } finally {
         recordWriter = null;
-      } catch(IOException ex) {
-        context.fail(ex);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
index 90eb05c..23ea23f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,9 +22,10 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.physical.base.Writer;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
@@ -34,7 +35,6 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.exec.store.ischema.Records;
 
 /**
  * Implements <code>CreateTableEntry</code> interface to create new tables in FileSystem storage.
@@ -47,28 +47,33 @@ public class FileSystemCreateTableEntry implements CreateTableEntry {
   private FormatPlugin formatPlugin;
   private String location;
   private final List<String> partitionColumns;
+  private final StorageStrategy storageStrategy;
 
   @JsonCreator
   public FileSystemCreateTableEntry(@JsonProperty("storageConfig") FileSystemConfig storageConfig,
                                     @JsonProperty("formatConfig") FormatPluginConfig formatConfig,
                                     @JsonProperty("location") String location,
                                     @JsonProperty("partitionColumn") List<String> partitionColumns,
+                                    @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
                                     @JacksonInject StoragePluginRegistry engineRegistry)
       throws ExecutionSetupException {
     this.storageConfig = storageConfig;
     this.formatPlugin = engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     this.location = location;
     this.partitionColumns = partitionColumns;
+    this.storageStrategy = storageStrategy;
   }
 
   public FileSystemCreateTableEntry(FileSystemConfig storageConfig,
                                     FormatPlugin formatPlugin,
                                     String location,
-                                    List<String> partitionColumns) {
+                                    List<String> partitionColumns,
+                                    StorageStrategy storageStrategy) {
     this.storageConfig = storageConfig;
     this.formatPlugin = formatPlugin;
     this.location = location;
     this.partitionColumns = partitionColumns;
+    this.storageStrategy = storageStrategy;
   }
 
   @JsonProperty("storageConfig")
@@ -89,11 +94,14 @@ public class FileSystemCreateTableEntry implements CreateTableEntry {
           formatPlugin.getName())).build(logger);
     }
 
-    return formatPlugin.getWriter(child, location, partitionColumns);
+    AbstractWriter writer = formatPlugin.getWriter(child, location, partitionColumns);
+    writer.setStorageStrategy(storageStrategy);
+    return writer;
   }
 
   @Override
   public List<String> getPartitionColumns() {
     return partitionColumns;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 76529d4..0ad3944 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -58,12 +58,7 @@ public class DrillSqlWorker {
   public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan)
       throws ForemanSetupException {
 
-    final SqlConverter parser = new SqlConverter(
-        context.getPlannerSettings(),
-        context.getNewDefaultSchema(),
-        context.getDrillOperatorTable(),
-        (UdfUtilities) context,
-        context.getFunctionRegistry());
+    final SqlConverter parser = new SqlConverter(context);
 
     injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class);
     final SqlNode sqlNode = parser.parse(sql);

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
index 085f808..20c92c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,8 +22,12 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
 
 import java.util.Collections;
 import java.util.List;
@@ -116,6 +120,11 @@ public class SchemaUtilites {
     return SCHEMA_PATH_JOINER.join(getSchemaPathAsList(schema));
   }
 
+  /** Utility method to get the schema path for given list of schema path. */
+  public static String getSchemaPath(List<String> schemaPath) {
+    return SCHEMA_PATH_JOINER.join(schemaPath);
+  }
+
   /** Utility method to get the schema path as list for given schema instance. */
   public static List<String> getSchemaPathAsList(SchemaPlus schema) {
     if (isRootSchema(schema)) {
@@ -177,4 +186,35 @@ public class SchemaUtilites {
 
     return drillSchema;
   }
+
+  /**
+   * Looks in schema tree for default temporary workspace instance.
+   * Makes sure that temporary workspace is mutable and file-based
+   * (instance of {@link WorkspaceSchemaFactory.WorkspaceSchema}).
+   *
+   * @param defaultSchema default schema
+   * @param config drill config
+   * @return default temporary workspace
+   */
+  public static AbstractSchema getTemporaryWorkspace(SchemaPlus defaultSchema, DrillConfig config) {
+    List<String> temporarySchemaPath = Lists.newArrayList(config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE));
+    AbstractSchema temporarySchema = resolveToMutableDrillSchema(defaultSchema, temporarySchemaPath);
+    if (!(temporarySchema instanceof WorkspaceSchemaFactory.WorkspaceSchema)) {
+      DrillRuntimeException.format("Temporary workspace [%s] must be file-based, instance of " +
+          "WorkspaceSchemaFactory.WorkspaceSchema", temporarySchemaPath);
+    }
+    return temporarySchema;
+  }
+
+  /**
+   * Checks that passed schema path is the same as temporary workspace path.
+   * Check is case-sensitive.
+   *
+   * @param schemaPath schema path
+   * @param config drill config
+   * @return true is schema path corresponds to temporary workspace, false otherwise
+   */
+  public static boolean isTemporaryWorkspace(String schemaPath, DrillConfig config) {
+    return schemaPath.equals(config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 0c3c6a0..28196c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,10 +21,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.avatica.util.Casing;
 import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.jdbc.CalciteSchemaImpl;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.plan.ConventionTraitDef;
@@ -33,6 +35,7 @@ import org.apache.calcite.plan.RelOptCostFactory;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.prepare.RelOptTableImpl;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
@@ -41,17 +44,14 @@ import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.parser.SqlParserImplFactory;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.sql.validate.AggregatingSelectScope;
 import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
 import org.apache.calcite.sql.validate.SqlValidatorException;
@@ -61,18 +61,21 @@ import org.apache.calcite.sql2rel.RelDecorrelator;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.exception.FunctionNotFoundException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.logical.DrillConstExecutor;
 import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.sql.parser.impl.DrillParserWithCompoundIdConverter;
 
 import com.google.common.base.Joiner;
+import org.apache.drill.exec.rpc.user.UserSession;
 
 /**
  * Class responsible for managing parsing, validation and toRel conversion for sql statements.
@@ -86,7 +89,7 @@ public class SqlConverter {
   private final SqlParser.Config parserConfig;
   // Allow the default config to be modified using immutable configs
   private SqlToRelConverter.Config sqlToRelConverterConfig;
-  private final CalciteCatalogReader catalog;
+  private final DrillCalciteCatalogReader catalog;
   private final PlannerSettings settings;
   private final SchemaPlus rootSchema;
   private final SchemaPlus defaultSchema;
@@ -96,35 +99,42 @@ public class SqlConverter {
   private final boolean isInnerQuery;
   private final UdfUtilities util;
   private final FunctionImplementationRegistry functions;
+  private final String temporarySchema;
+  private final UserSession session;
+  private final DrillConfig drillConfig;
 
   private String sql;
   private VolcanoPlanner planner;
 
 
-  public SqlConverter(PlannerSettings settings, SchemaPlus defaultSchema,
-      final SqlOperatorTable operatorTable, UdfUtilities util, FunctionImplementationRegistry functions) {
-    this.settings = settings;
-    this.util = util;
-    this.functions = functions;
+  public SqlConverter(QueryContext context) {
+    this.settings = context.getPlannerSettings();
+    this.util = (UdfUtilities) context;
+    this.functions = context.getFunctionRegistry();
     this.parserConfig = new ParserConfig();
     this.sqlToRelConverterConfig = new SqlToRelConverterConfig();
     this.isInnerQuery = false;
     this.typeFactory = new JavaTypeFactoryImpl(DRILL_TYPE_SYSTEM);
-    this.defaultSchema = defaultSchema;
+    this.defaultSchema =  context.getNewDefaultSchema();
     this.rootSchema = rootSchema(defaultSchema);
-    this.catalog = new CalciteCatalogReader(
+    this.temporarySchema = context.getConfig().getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE);
+    this.session = context.getSession();
+    this.drillConfig = context.getConfig();
+    this.catalog = new DrillCalciteCatalogReader(
         CalciteSchemaImpl.from(rootSchema),
         parserConfig.caseSensitive(),
         CalciteSchemaImpl.from(defaultSchema).path(null),
-        typeFactory);
-    this.opTab = new ChainedSqlOperatorTable(Arrays.asList(operatorTable, catalog));
+        typeFactory,
+        drillConfig,
+        session);
+    this.opTab = new ChainedSqlOperatorTable(Arrays.asList(context.getDrillOperatorTable(), catalog));
     this.costFactory = (settings.useDefaultCosting()) ? null : new DrillCostBase.DrillCostFactory();
     this.validator = new DrillValidator(opTab, catalog, typeFactory, SqlConformance.DEFAULT);
     validator.setIdentifierExpansion(true);
   }
 
   private SqlConverter(SqlConverter parent, SchemaPlus defaultSchema, SchemaPlus rootSchema,
-      CalciteCatalogReader catalog) {
+      DrillCalciteCatalogReader catalog) {
     this.parserConfig = parent.parserConfig;
     this.sqlToRelConverterConfig = parent.sqlToRelConverterConfig;
     this.defaultSchema = defaultSchema;
@@ -139,6 +149,9 @@ public class SqlConverter {
     this.opTab = parent.opTab;
     this.planner = parent.planner;
     this.validator = new DrillValidator(opTab, catalog, typeFactory, SqlConformance.DEFAULT);
+    this.temporarySchema = parent.temporarySchema;
+    this.session = parent.session;
+    this.drillConfig = parent.drillConfig;
     validator.setIdentifierExpansion(true);
   }
 
@@ -203,6 +216,11 @@ public class SqlConverter {
     return defaultSchema;
   }
 
+  /** Disallow temporary tables presence in sql statement (ex: in view definitions) */
+  public void disallowTemporaryTables() {
+    catalog.disallowTemporaryTables();
+  }
+
   private class DrillValidator extends SqlValidatorImpl {
     private final Set<SqlValidatorScope> identitySet = Sets.newIdentityHashSet();
 
@@ -272,26 +290,27 @@ public class SqlConverter {
     public Expander() {
     }
 
-    public RelNode expandView(
-        RelDataType rowType,
-        String queryString,
-        List<String> schemaPath) {
-      SqlConverter parser = new SqlConverter(SqlConverter.this, defaultSchema, rootSchema,
-          catalog.withSchemaPath(schemaPath));
+    public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) {
+      final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader(
+          CalciteSchemaImpl.from(rootSchema),
+          parserConfig.caseSensitive(),
+          schemaPath,
+          typeFactory,
+          drillConfig,
+          session);
+      final SqlConverter parser = new SqlConverter(SqlConverter.this, defaultSchema, rootSchema, catalogReader);
       return expandView(queryString, parser);
     }
 
     @Override
-    public RelNode expandView(
-        RelDataType rowType,
-        String queryString,
-        SchemaPlus rootSchema, // new root schema
-        List<String> schemaPath) {
-      final CalciteCatalogReader catalogReader = new CalciteCatalogReader(
-          CalciteSchemaImpl.from(rootSchema),
+    public RelNode expandView(RelDataType rowType, String queryString, SchemaPlus rootSchema, List<String> schemaPath) {
+      final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader(
+          CalciteSchemaImpl.from(rootSchema), // new root schema
           parserConfig.caseSensitive(),
           schemaPath,
-          typeFactory);
+          typeFactory,
+          drillConfig,
+          session);
       SchemaPlus schema = rootSchema;
       for (String s : schemaPath) {
         SchemaPlus newSchema = schema.getSubSchema(s);
@@ -447,4 +466,66 @@ public class SqlConverter {
       return node;
     }
   }
+
+  /**
+   * Extension of {@link CalciteCatalogReader} to add ability to check for temporary tables first
+   * if schema is not indicated near table name during query parsing
+   * or indicated workspace is default temporary workspace.
+   */
+  private class DrillCalciteCatalogReader extends CalciteCatalogReader {
+
+    private final DrillConfig drillConfig;
+    private final UserSession session;
+    private boolean allowTemporaryTables;
+
+    DrillCalciteCatalogReader(CalciteSchema rootSchema,
+                                     boolean caseSensitive,
+                                     List<String> defaultSchema,
+                                     JavaTypeFactory typeFactory,
+                                     DrillConfig drillConfig,
+                                     UserSession session) {
+      super(rootSchema, caseSensitive, defaultSchema, typeFactory);
+      this.drillConfig = drillConfig;
+      this.session = session;
+      this.allowTemporaryTables = true;
+    }
+
+    /** Disallow temporary tables presence in sql statement (ex: in view definitions) */
+    public void disallowTemporaryTables() {
+      this.allowTemporaryTables = false;
+    }
+
+    /**
+     * If schema is not indicated (only one element in the list) or schema is default temporary workspace,
+     * we need to check among session temporary tables first in default temporary workspace.
+     * If temporary table is found and temporary tables usage is allowed, its table instance will be returned,
+     * otherwise search will be conducted in original workspace.
+     *
+     * @param names list of schema and table names, table name is always the last element
+     * @return table instance, null otherwise
+     * @throws UserException if temporary tables usage is disallowed
+     */
+    @Override
+    public RelOptTableImpl getTable(final List<String> names) {
+      RelOptTableImpl temporaryTable = null;
+      String schemaPath = SchemaUtilites.getSchemaPath(names.subList(0, names.size() - 1));
+      if (names.size() == 1 || SchemaUtilites.isTemporaryWorkspace(schemaPath, drillConfig)) {
+        String temporaryTableName = session.resolveTemporaryTableName(names.get(names.size() - 1));
+        if (temporaryTableName != null) {
+          List<String> temporaryNames = Lists.newArrayList(temporarySchema, temporaryTableName);
+          temporaryTable = super.getTable(temporaryNames);
+        }
+      }
+      if (temporaryTable != null) {
+        if (allowTemporaryTables) {
+          return temporaryTable;
+        }
+        throw UserException
+            .validationError()
+            .message("Temporary tables usage is disallowed. Used temporary table name: %s.", names)
+            .build(logger);
+      }
+      return super.getTable(names);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index b6ffde6..12c72c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -31,13 +31,17 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScreenRel;
 import org.apache.drill.exec.planner.logical.DrillWriterRel;
@@ -67,43 +71,54 @@ public class CreateTableHandler extends DefaultSqlHandler {
   @Override
   public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
     SqlCreateTable sqlCreateTable = unwrap(sqlNode, SqlCreateTable.class);
-    final String newTblName = sqlCreateTable.getName();
+    String originalTableName = sqlCreateTable.getName();
 
     final ConvertedRelNode convertedRelNode = validateAndConvert(sqlCreateTable.getQuery());
     final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
     final RelNode queryRelNode = convertedRelNode.getConvertedNode();
 
-
     final RelNode newTblRelNode =
         SqlHandlerUtil.resolveNewTableRel(false, sqlCreateTable.getFieldNames(), validatedRowType, queryRelNode);
 
-    final AbstractSchema drillSchema =
-        SchemaUtilites.resolveToMutableDrillSchema(config.getConverter().getDefaultSchema(),
-            sqlCreateTable.getSchemaPath());
-    final String schemaPath = drillSchema.getFullSchemaName();
+    final DrillConfig drillConfig = context.getConfig();
+    final AbstractSchema drillSchema = resolveSchema(sqlCreateTable, config.getConverter().getDefaultSchema(), drillConfig);
 
-    if (SqlHandlerUtil.getTableFromSchema(drillSchema, newTblName) != null) {
-      throw UserException.validationError()
-          .message("A table or view with given name [%s] already exists in schema [%s]", newTblName, schemaPath)
-          .build(logger);
-    }
+    checkDuplicatedObjectExistence(drillSchema, originalTableName, drillConfig, context.getSession());
 
-    final RelNode newTblRelNodeWithPCol = SqlHandlerUtil.qualifyPartitionCol(newTblRelNode, sqlCreateTable.getPartitionColumns());
+    final RelNode newTblRelNodeWithPCol = SqlHandlerUtil.qualifyPartitionCol(newTblRelNode,
+        sqlCreateTable.getPartitionColumns());
 
     log("Calcite", newTblRelNodeWithPCol, logger, null);
-
     // Convert the query to Drill Logical plan and insert a writer operator on top.
-    DrillRel drel = convertToDrel(newTblRelNodeWithPCol, drillSchema, newTblName, sqlCreateTable.getPartitionColumns(), newTblRelNode.getRowType());
+    StorageStrategy storageStrategy = sqlCreateTable.isTemporary() ?
+        StorageStrategy.TEMPORARY : StorageStrategy.PERSISTENT;
+
+    // If we are creating temporary table, initial table name will be replaced with generated table name.
+    // Generated table name is unique, UUID.randomUUID() is used for its generation.
+    // Original table name is stored in temporary tables cache, so it can be substituted to generated one during querying.
+    String newTableName = sqlCreateTable.isTemporary() ?
+        context.getSession().registerTemporaryTable(drillSchema, originalTableName) : originalTableName;
+
+    DrillRel drel = convertToDrel(newTblRelNodeWithPCol, drillSchema, newTableName,
+        sqlCreateTable.getPartitionColumns(), newTblRelNode.getRowType(), storageStrategy);
     Prel prel = convertToPrel(drel, newTblRelNode.getRowType(), sqlCreateTable.getPartitionColumns());
     logAndSetTextPlan("Drill Physical", prel, logger);
     PhysicalOperator pop = convertToPop(prel);
     PhysicalPlan plan = convertToPlan(pop);
     log("Drill Plan", plan, logger);
 
+    String message = String.format("Creating %s table [%s].",
+        sqlCreateTable.isTemporary()  ? "temporary" : "persistent", originalTableName);
+    logger.info(message);
     return plan;
   }
 
-  private DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String tableName, List<String> partitionColumns, RelDataType queryRowType)
+  private DrillRel convertToDrel(RelNode relNode,
+                                 AbstractSchema schema,
+                                 String tableName,
+                                 List<String> partitionColumns,
+                                 RelDataType queryRowType,
+                                 StorageStrategy storageStrategy)
       throws RelConversionException, SqlUnsupportedException {
     final DrillRel convertedRelNode = convertToDrel(relNode);
 
@@ -114,7 +129,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
 
     final RelTraitSet traits = convertedRelNode.getCluster().traitSet().plus(DrillRel.DRILL_LOGICAL);
     final DrillWriterRel writerRel = new DrillWriterRel(convertedRelNode.getCluster(),
-        traits, topPreservedNameProj, schema.createNewTable(tableName, partitionColumns));
+        traits, topPreservedNameProj, schema.createNewTable(tableName, partitionColumns, storageStrategy));
     return new DrillScreenRel(writerRel.getCluster(), writerRel.getTraitSet(), writerRel);
   }
 
@@ -186,7 +201,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
         return (Prel) prel.copy(projectUnderWriter.getTraitSet(),
             Collections.singletonList( (RelNode) projectUnderWriter));
       } else {
-        // find list of partiiton columns.
+        // find list of partition columns.
         final List<RexNode> partitionColumnExprs = Lists.newArrayListWithExpectedSize(partitionColumns.size());
         for (final String colName : partitionColumns) {
           final RelDataTypeField field = childRowType.getField(colName, false, false);
@@ -242,4 +257,62 @@ public class CreateTableHandler extends DefaultSqlHandler {
     return node;
   }
 
+  /**
+   * Resolves schema taking into account type of table being created.
+   * If schema path wasn't indicated in sql call and table type to be created is temporary
+   * returns temporary workspace.
+   *
+   * If schema path is indicated, resolves to mutable drill schema.
+   * Though if table to be created is temporary table, checks if resolved schema is temporary,
+   * since temporary table are allowed to be created only in temporary workspace.
+   *
+   * @param sqlCreateTable create table call
+   * @param defaultSchema default schema
+   * @param config drill config
+   * @return resolved schema
+   * @throws UserException if attempted to create temporary table outside of temporary workspace
+   */
+  private AbstractSchema resolveSchema(SqlCreateTable sqlCreateTable, SchemaPlus defaultSchema, DrillConfig config) {
+    if (sqlCreateTable.isTemporary() && sqlCreateTable.getSchemaPath().size() == 0) {
+      return SchemaUtilites.getTemporaryWorkspace(defaultSchema, config);
+    } else {
+      AbstractSchema resolvedSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, sqlCreateTable.getSchemaPath());
+      boolean isTemporaryWorkspace = SchemaUtilites.isTemporaryWorkspace(resolvedSchema.getFullSchemaName(), config);
+
+      if (sqlCreateTable.isTemporary() && !isTemporaryWorkspace) {
+        throw UserException
+            .validationError()
+            .message(String.format("Temporary tables are not allowed to be created " +
+                "outside of default temporary workspace [%s].", config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE)))
+            .build(logger);
+      }
+      return resolvedSchema;
+    }
+  }
+
+  /**
+   * Checks if any object (persistent table / temporary table / view)
+   * with the same name as table to be created exists in indicated schema.
+   *
+   * @param drillSchema schema where table will be created
+   * @param tableName table name
+   * @param config drill config
+   * @param userSession current user session
+   * @throws UserException if duplicate is found
+   */
+  private void checkDuplicatedObjectExistence(AbstractSchema drillSchema,
+                                              String tableName,
+                                              DrillConfig config,
+                                              UserSession userSession) {
+    String schemaPath = drillSchema.getFullSchemaName();
+    boolean isTemporaryTable = userSession.isTemporaryTable(drillSchema, config, tableName);
+
+    if (isTemporaryTable || SqlHandlerUtil.getTableFromSchema(drillSchema, tableName) != null) {
+      throw UserException
+          .validationError()
+          .message("A table or view with given name [%s] already exists in schema [%s]",
+              tableName, schemaPath)
+          .build(logger);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java
index 517c183..a9895db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,19 +18,21 @@
 package org.apache.drill.exec.planner.sql.handlers;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
-import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.parser.SqlDropTable;
+import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 
 // SqlHandler for dropping a table.
@@ -46,6 +48,7 @@ public class DropTableHandler extends DefaultSqlHandler {
    * Function resolves the schema and invokes the drop method
    * (while IF EXISTS statement is used function invokes the drop method only if table exists).
    * Raises an exception if the schema is immutable.
+   *
    * @param sqlNode - SqlDropTable (SQL parse tree of drop table [if exists] query)
    * @return - Single row indicating drop succeeded or table is not found while IF EXISTS statement is used,
    * raise exception otherwise
@@ -55,35 +58,51 @@ public class DropTableHandler extends DefaultSqlHandler {
    */
   @Override
   public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException {
-
     SqlDropTable dropTableNode = ((SqlDropTable) sqlNode);
-    SqlIdentifier tableIdentifier = dropTableNode.getTableIdentifier();
-
+    String originalTableName = dropTableNode.getName();
     SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
-    AbstractSchema drillSchema = null;
+    List<String> tableSchema = dropTableNode.getSchema();
+    DrillConfig drillConfig = context.getConfig();
+    UserSession session = context.getSession();
 
-    if (tableIdentifier != null) {
-      drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, dropTableNode.getSchema());
-    }
-
-    String tableName = dropTableNode.getName();
-    if (drillSchema == null) {
-      throw UserException.validationError()
-          .message("Invalid table_name [%s]", tableName)
-          .build(logger);
-    }
+    AbstractSchema temporarySchema = resolveToTemporarySchema(tableSchema, defaultSchema, drillConfig);
+    boolean isTemporaryTable = session.isTemporaryTable(temporarySchema, drillConfig, originalTableName);
 
-    if (dropTableNode.checkTableExistence()) {
-      final Table tableToDrop = SqlHandlerUtil.getTableFromSchema(drillSchema, tableName);
+    if (isTemporaryTable) {
+      session.removeTemporaryTable(temporarySchema, originalTableName);
+    } else {
+      AbstractSchema drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, tableSchema);
+      Table tableToDrop = SqlHandlerUtil.getTableFromSchema(drillSchema, originalTableName);
       if (tableToDrop == null || tableToDrop.getJdbcTableType() != Schema.TableType.TABLE) {
-        return DirectPlan.createDirectPlan(context, true,
-            String.format("Table [%s] not found", tableName));
+        if (dropTableNode.checkTableExistence()) {
+          return DirectPlan.createDirectPlan(context, false, String.format("Table [%s] not found", originalTableName));
+        } else {
+          throw UserException.validationError().message("Table [%s] not found", originalTableName).build(logger);
+        }
       }
+      SqlHandlerUtil.dropTableFromSchema(drillSchema, originalTableName);
     }
 
-    drillSchema.dropTable(tableName);
+    String message = String.format("%s [%s] dropped", isTemporaryTable ? "Temporary table" : "Table", originalTableName);
+    logger.info(message);
+    return DirectPlan.createDirectPlan(context, true, message);
+  }
 
-    return DirectPlan.createDirectPlan(context, true,
-        String.format("Table [%s] %s", tableName, "dropped"));
+  /**
+   * If table schema is not indicated in sql call, returns temporary workspace.
+   * If schema is indicated, resolves to mutable table schema.
+   *
+   * @param tableSchema table schema
+   * @param defaultSchema default schema
+   * @param config drill config
+   * @return resolved schema
+   */
+  private AbstractSchema resolveToTemporarySchema(List<String> tableSchema, SchemaPlus defaultSchema, DrillConfig config) {
+    if (tableSchema.size() == 0) {
+      return SchemaUtilites.getTemporaryWorkspace(defaultSchema, config);
+    } else {
+      return SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, tableSchema);
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
index ca7a510..04930a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,31 +22,24 @@ import com.google.common.collect.Sets;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlWriter;
-import org.apache.calcite.sql.TypedSqlNode;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.tools.Planner;
 import org.apache.calcite.tools.RelConversionException;
-import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.planner.StarColumnHelper;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
-import org.apache.drill.exec.planner.sql.DirectPlan;
-import org.apache.drill.exec.planner.types.DrillFixedRelDataTypeImpl;
 import org.apache.drill.exec.store.AbstractSchema;
 
 import org.apache.calcite.tools.ValidationException;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.drill.exec.store.ischema.Records;
 
+import java.io.IOException;
 import java.util.AbstractList;
 import java.util.HashSet;
 import java.util.List;
@@ -235,4 +228,42 @@ public class SqlHandlerUtil {
     writer.keyword(")");
   }
 
+  /**
+   * Drops table from schema.
+   * If drop has failed makes concurrency check: checks if table still exists.
+   * If table exists, throws {@link @UserException} since drop was unsuccessful,
+   * otherwise assumes that other user had dropped the view and exists without error.
+   *
+   * @param drillSchema drill schema
+   * @param tableName table name
+   */
+  public static void dropTableFromSchema(AbstractSchema drillSchema, String tableName) {
+    try {
+      drillSchema.dropTable(tableName);
+    } catch (Exception e) {
+      if (SqlHandlerUtil.getTableFromSchema(drillSchema, tableName) != null) {
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Drops view from schema.
+   * If drop has failed makes concurrency check: checks if view still exists.
+   * If view exists, throws {@link @UserException} since drop was unsuccessful,
+   * otherwise assumes that other user had dropped the view and exists without error.
+   *
+   * @param drillSchema drill schema
+   * @param viewName view name
+   */
+  public static void dropViewFromSchema(AbstractSchema drillSchema, String viewName) throws IOException {
+    try {
+      drillSchema.dropView(viewName);
+    } catch (Exception e) {
+      if (SqlHandlerUtil.getTableFromSchema(drillSchema, viewName) != null) {
+        throw e;
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
index b8396e6..495e8b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
@@ -62,9 +62,10 @@ public abstract class ViewHandler extends DefaultSqlHandler {
 
       final String newViewName = createView.getName();
 
+      // Disallow temporary tables usage in view definition
+      config.getConverter().disallowTemporaryTables();
       // Store the viewSql as view def SqlNode is modified as part of the resolving the new table definition below.
       final String viewSql = createView.getQuery().toString();
-
       final ConvertedRelNode convertedRelNode = validateAndConvert(createView.getQuery());
       final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
       final RelNode queryRelNode = convertedRelNode.getConvertedNode();
@@ -74,36 +75,50 @@ public abstract class ViewHandler extends DefaultSqlHandler {
       final SchemaPlus defaultSchema = context.getNewDefaultSchema();
       final AbstractSchema drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, createView.getSchemaPath());
 
-      final String schemaPath = drillSchema.getFullSchemaName();
       final View view = new View(newViewName, viewSql, newViewRelNode.getRowType(),
           SchemaUtilites.getSchemaPathAsList(defaultSchema));
 
-      final Table existingTable = SqlHandlerUtil.getTableFromSchema(drillSchema, newViewName);
-
-      if (existingTable != null) {
-        if (existingTable.getJdbcTableType() != Schema.TableType.VIEW) {
-          // existing table is not a view
-          throw UserException.validationError()
-              .message("A non-view table with given name [%s] already exists in schema [%s]",
-                  newViewName, schemaPath)
-              .build(logger);
-        }
-
-        if (existingTable.getJdbcTableType() == Schema.TableType.VIEW && !createView.getReplace()) {
-          // existing table is a view and create view has no "REPLACE" clause
-          throw UserException.validationError()
-              .message("A view with given name [%s] already exists in schema [%s]",
-                  newViewName, schemaPath)
-              .build(logger);
-        }
-      }
+      validateViewCreationPossibility(drillSchema, createView, context);
 
       final boolean replaced = drillSchema.createView(view);
       final String summary = String.format("View '%s' %s successfully in '%s' schema",
-          createView.getName(), replaced ? "replaced" : "created", schemaPath);
+          createView.getName(), replaced ? "replaced" : "created", drillSchema.getFullSchemaName());
 
       return DirectPlan.createDirectPlan(context, true, summary);
     }
+
+    /**
+     * Validates if view can be created in indicated schema:
+     * checks if object (persistent / temporary table) with the same name exists
+     * or if view with the same name exists but replace flag is not set.
+     *
+     * @param drillSchema schema where views will be created
+     * @param view create view call
+     * @param context query context
+     * @throws UserException if views can be created in indicated schema
+     */
+    private void validateViewCreationPossibility(AbstractSchema drillSchema, SqlCreateView view, QueryContext context) {
+      final String schemaPath = drillSchema.getFullSchemaName();
+      final String viewName = view.getName();
+      final Table existingTable = SqlHandlerUtil.getTableFromSchema(drillSchema, viewName);
+
+      if ((existingTable != null && existingTable.getJdbcTableType() != Schema.TableType.VIEW) ||
+          context.getSession().isTemporaryTable(drillSchema, context.getConfig(), viewName)) {
+        // existing table is not a view
+        throw UserException
+            .validationError()
+            .message("A non-view table with given name [%s] already exists in schema [%s]", viewName, schemaPath)
+            .build(logger);
+      }
+
+      if ((existingTable != null && existingTable.getJdbcTableType() == Schema.TableType.VIEW) && !view.getReplace()) {
+          // existing table is a view and create view has no "REPLACE" clause
+        throw UserException
+            .validationError()
+            .message("A view with given name [%s] already exists in schema [%s]", viewName, schemaPath)
+            .build(logger);
+      }
+    }
   }
 
   /** Handler for Drop View [If Exists] DDL command. */
@@ -124,7 +139,7 @@ public abstract class ViewHandler extends DefaultSqlHandler {
       final Table viewToDrop = SqlHandlerUtil.getTableFromSchema(drillSchema, viewName);
       if (dropView.checkViewExistence()) {
         if (viewToDrop == null || viewToDrop.getJdbcTableType() != Schema.TableType.VIEW){
-          return DirectPlan.createDirectPlan(context, true,
+          return DirectPlan.createDirectPlan(context, false,
               String.format("View [%s] not found in schema [%s].", viewName, schemaPath));
         }
       } else {
@@ -139,7 +154,7 @@ public abstract class ViewHandler extends DefaultSqlHandler {
         }
       }
 
-      drillSchema.dropView(viewName);
+      SqlHandlerUtil.dropViewFromSchema(drillSchema, viewName);
 
       return DirectPlan.createDirectPlan(context, true,
           String.format("View [%s] deleted successfully from schema [%s].", viewName, schemaPath));

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
index 53e3cd5..db934e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -161,7 +161,7 @@ public class CompoundIdentifierConverter extends SqlShuttle {
   //SqlNode offset,
   //SqlNode fetch,
     rules.put(SqlSelect.class, R(D, E, D, E, E, E, E, E, D, D));
-    rules.put(SqlCreateTable.class, R(D, D, D, E));
+    rules.put(SqlCreateTable.class, R(D, D, D, E, D));
     rules.put(SqlCreateView.class, R(D, E, E, D));
     rules.put(SqlDescribeTable.class, R(D, D, E));
     rules.put(SqlDropView.class, R(D, D));

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
index 5835b10..bba60b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -48,8 +48,13 @@ public class SqlCreateTable extends DrillSqlCall {
   public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_TABLE", SqlKind.OTHER) {
     @Override
     public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
-      Preconditions.checkArgument(operands.length == 4, "SqlCreateTable.createCall() has to get 4 operands!");
-      return new SqlCreateTable(pos, (SqlIdentifier) operands[0], (SqlNodeList) operands[1], (SqlNodeList) operands[2], operands[3]);
+      Preconditions.checkArgument(operands.length == 5, "SqlCreateTable.createCall() has to get 5 operands!");
+      return new SqlCreateTable(pos,
+          (SqlIdentifier) operands[0],
+          (SqlNodeList) operands[1],
+          (SqlNodeList) operands[2],
+          operands[3],
+          (SqlLiteral) operands[4]);
     }
   };
 
@@ -57,13 +62,20 @@ public class SqlCreateTable extends DrillSqlCall {
   private final SqlNodeList fieldList;
   private final SqlNodeList partitionColumns;
   private final SqlNode query;
-
-  public SqlCreateTable(SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList, SqlNodeList partitionColumns, SqlNode query) {
+  private final SqlLiteral isTemporary;
+
+  public SqlCreateTable(SqlParserPos pos,
+                        SqlIdentifier tblName,
+                        SqlNodeList fieldList,
+                        SqlNodeList partitionColumns,
+                        SqlNode query,
+                        SqlLiteral isTemporary) {
     super(pos);
     this.tblName = tblName;
     this.fieldList = fieldList;
     this.partitionColumns = partitionColumns;
     this.query = query;
+    this.isTemporary = isTemporary;
   }
 
   @Override
@@ -78,12 +90,16 @@ public class SqlCreateTable extends DrillSqlCall {
     ops.add(fieldList);
     ops.add(partitionColumns);
     ops.add(query);
+    ops.add(isTemporary);
     return ops;
   }
 
   @Override
   public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
     writer.keyword("CREATE");
+    if (isTemporary.booleanValue()) {
+      writer.keyword("TEMPORARY");
+    }
     writer.keyword("TABLE");
     tblName.unparse(writer, leftPrec, rightPrec);
     if (fieldList.size() > 0) {
@@ -142,4 +158,6 @@ public class SqlCreateTable extends DrillSqlCall {
 
   public SqlNode getQuery() { return query; }
 
+  public boolean isTemporary() { return isTemporary.booleanValue(); }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 4e17249..281b124 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -299,6 +299,17 @@ public class UserServer extends BasicServer<RpcType, UserClientConnectionImpl> {
     public SocketAddress getRemoteAddress() {
       return getChannel().remoteAddress();
     }
+
+    @Override
+    public void closeSession() {
+      session.close();
+    }
+
+    @Override
+    public void close() {
+      closeSession();
+      super.close();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 3bf9051..c3639d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,9 +17,14 @@
  */
 package org.apache.drill.exec.rpc.user;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Preconditions;
@@ -27,9 +32,13 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
+import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.planner.sql.SchemaUtilites;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerUtil;
 import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.UserProtos.Property;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
@@ -37,8 +46,14 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 
 import com.google.common.collect.Maps;
-
-public class UserSession {
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class UserSession implements Closeable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserSession.class);
 
   public static final String SCHEMA = "schema";
@@ -54,18 +69,43 @@ public class UserSession {
   private Map<String, String> properties;
   private OptionManager sessionOptions;
   private final AtomicInteger queryCount;
+  private final String sessionId;
+
+  /** Stores list of temporary tables, key is original table name converted to lower case to achieve case-insensitivity,
+   *  value is generated table name. **/
+  private final ConcurrentMap<String, String> temporaryTables;
+  /** Stores list of session temporary locations, key is path to location, value is file system associated with location. **/
+  private final ConcurrentMap<Path, FileSystem> temporaryLocations;
+
+  /** On session close deletes all session temporary locations recursively and clears temporary locations list. */
+  @Override
+  public void close() {
+    for (Map.Entry<Path, FileSystem> entry : temporaryLocations.entrySet()) {
+      Path path = entry.getKey();
+      FileSystem fs = entry.getValue();
+      try {
+        fs.delete(path, true);
+        logger.info("Deleted session temporary location [{}] from file system [{}]",
+            path.toUri().getPath(), fs.getUri());
+      } catch (Exception e) {
+        logger.warn("Error during session temporary location [{}] deletion from file system [{}]: [{}]",
+            path.toUri().getPath(), fs.getUri(), e.getMessage());
+      }
+    }
+    temporaryLocations.clear();
+  }
 
   /**
    * Implementations of this interface are allowed to increment queryCount.
    * {@link org.apache.drill.exec.work.user.UserWorker} should have a member that implements the interface.
    * No other core class should implement this interface. Test classes may implement (see ControlsInjectionUtil).
    */
-  public static interface QueryCountIncrementer {
-    public void increment(final UserSession session);
+  public interface QueryCountIncrementer {
+    void increment(final UserSession session);
   }
 
   public static class Builder {
-    UserSession userSession;
+    private UserSession userSession;
 
     public static Builder newBuilder() {
       return new Builder();
@@ -115,6 +155,9 @@ public class UserSession {
 
   private UserSession() {
     queryCount = new AtomicInteger(0);
+    sessionId = UUID.randomUUID().toString();
+    temporaryTables = Maps.newConcurrentMap();
+    temporaryLocations = Maps.newConcurrentMap();
   }
 
   public boolean isSupportComplexTypes() {
@@ -197,7 +240,7 @@ public class UserSession {
 
   /**
    * Get default schema from current default schema path and given schema tree.
-   * @param rootSchema
+   * @param rootSchema root schema
    * @return A {@link org.apache.calcite.schema.SchemaPlus} object.
    */
   public SchemaPlus getDefaultSchema(SchemaPlus rootSchema) {
@@ -207,18 +250,117 @@ public class UserSession {
       return null;
     }
 
-    final SchemaPlus defaultSchema = SchemaUtilites.findSchema(rootSchema, defaultSchemaPath);
+    return SchemaUtilites.findSchema(rootSchema, defaultSchemaPath);
+  }
+
+  public boolean setSessionOption(String name, String value) {
+    return true;
+  }
+
+  /**
+   * @return unique session identifier
+   */
+  public String getSessionId() { return sessionId; }
+
+  /**
+   * Creates and adds session temporary location if absent using schema configuration.
+   * Generates temporary table name and stores it's original name as key
+   * and generated name as value in  session temporary tables cache.
+   * Original temporary name is converted to lower case to achieve case-insensitivity.
+   * If original table name already exists, new name is not regenerated and is reused.
+   * This can happen if default temporary workspace was changed (file system or location) or
+   * orphan temporary table name has remained (name was registered but table creation did not succeed).
+   *
+   * @param schema table schema
+   * @param tableName original table name
+   * @return generated temporary table name
+   * @throws IOException if error during session temporary location creation
+   */
+  public String registerTemporaryTable(AbstractSchema schema, String tableName) throws IOException {
+      addTemporaryLocation((WorkspaceSchemaFactory.WorkspaceSchema) schema);
+      String temporaryTableName = Paths.get(sessionId, UUID.randomUUID().toString()).toString();
+      String oldTemporaryTableName = temporaryTables.putIfAbsent(tableName.toLowerCase(), temporaryTableName);
+      return oldTemporaryTableName == null ? temporaryTableName : oldTemporaryTableName;
+  }
+
+  /**
+   * Returns generated temporary table name from the list of session temporary tables, null otherwise.
+   * Original temporary name is converted to lower case to achieve case-insensitivity.
+   *
+   * @param tableName original table name
+   * @return generated temporary table name
+   */
+  public String resolveTemporaryTableName(String tableName) {
+    return temporaryTables.get(tableName.toLowerCase());
+  }
 
-    if (defaultSchema == null) {
-      // If the current schema resolves to null, return root schema as the current default schema.
-      return defaultSchema;
+  /**
+   * Checks if passed table is temporary, table name is case-insensitive.
+   * Before looking for table checks if passed schema is temporary and returns false if not
+   * since temporary tables are allowed to be created in temporary workspace only.
+   * If passed workspace is temporary, looks for temporary table.
+   * First checks if table name is among temporary tables, if not returns false.
+   * If temporary table named was resolved, checks that temporary table exists on disk,
+   * to ensure that temporary table actually exists and resolved table name is not orphan
+   * (for example, in result of unsuccessful temporary table creation).
+   *
+   * @param drillSchema table schema
+   * @param config drill config
+   * @param tableName original table name
+   * @return true if temporary table exists in schema, false otherwise
+   */
+  public boolean isTemporaryTable(AbstractSchema drillSchema, DrillConfig config, String tableName) {
+    if (!SchemaUtilites.isTemporaryWorkspace(drillSchema.getFullSchemaName(), config)) {
+      return false;
     }
+    String temporaryTableName = resolveTemporaryTableName(tableName);
+    if (temporaryTableName != null) {
+      Table temporaryTable = SqlHandlerUtil.getTableFromSchema(drillSchema, temporaryTableName);
+      if (temporaryTable != null && temporaryTable.getJdbcTableType() == Schema.TableType.TABLE) {
+        return true;
+      }
+    }
+    return false;
+  }
 
-    return defaultSchema;
+  /**
+   * Removes temporary table name from the list of session temporary tables.
+   * Original temporary name is converted to lower case to achieve case-insensitivity.
+   *
+   * @param tableName original table name
+   */
+  public void removeTemporaryTable(AbstractSchema drillSchema, String tableName) {
+    String temporaryTable = resolveTemporaryTableName(tableName);
+    if (temporaryTable == null) {
+      return;
+    }
+    SqlHandlerUtil.dropTableFromSchema(drillSchema, temporaryTable);
+    temporaryTables.remove(tableName.toLowerCase());
   }
 
-  public boolean setSessionOption(String name, String value) {
-    return true;
+  /**
+   * Session temporary tables are stored under temporary workspace location in session folder
+   * defined by unique session id. These session temporary locations are deleted on session close.
+   * If default temporary workspace file system or location is changed at runtime,
+   * new session temporary location will be added with corresponding file system
+   * to the list of session temporary locations. If location does not exist it will be created and
+   * {@link StorageStrategy#TEMPORARY} storage rules will be applied to it.
+   *
+   * @param temporaryWorkspace temporary workspace
+   * @throws IOException in case of error during temporary location creation
+   */
+  private void addTemporaryLocation(WorkspaceSchemaFactory.WorkspaceSchema temporaryWorkspace) throws IOException {
+    DrillFileSystem fs = temporaryWorkspace.getFS();
+    Path temporaryLocation = new Path(Paths.get(fs.getUri().toString(),
+        temporaryWorkspace.getDefaultLocation(), sessionId).toString());
+
+    FileSystem fileSystem = temporaryLocations.putIfAbsent(temporaryLocation, fs);
+
+    if (fileSystem == null) {
+      StorageStrategy.TEMPORARY.createPathAndApply(fs, temporaryLocation);
+      Preconditions.checkArgument(fs.exists(temporaryLocation),
+          String.format("Temporary location should exist [%s]", temporaryLocation.toUri().getPath()));
+    }
   }
 
   private String getProp(String key) {

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 547915e..25776ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,6 +20,7 @@ package org.apache.drill.exec.server;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.StackTrace;
 import org.apache.drill.common.config.DrillConfig;
@@ -30,13 +31,17 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
 import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.server.options.OptionValue.OptionType;
 import org.apache.drill.exec.server.rest.WebServer;
 import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaTreeProvider;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
 import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
@@ -123,6 +128,7 @@ public class Drillbit implements AutoCloseable {
     storageRegistry.init();
     drillbitContext.getOptionManager().init();
     javaPropertiesToSystemOptions();
+    validateTemporaryWorkspace(manager.getContext());
     manager.getContext().getRemoteFunctionRegistry().init(context.getConfig(), storeProvider, coord);
     registrationHandle = coord.register(md);
     webServer.start();
@@ -215,6 +221,21 @@ public class Drillbit implements AutoCloseable {
   }
 
   /**
+   * Validates that temporary workspace indicated in configuration is
+   * mutable and file-based (instance of {@link WorkspaceSchemaFactory.WorkspaceSchema}).
+   *
+   * @param context drillbit context
+   * @throws Exception in case when temporary table schema is not mutable or
+   *                   not file-based (instance of WorkspaceSchemaFactory.WorkspaceSchema)
+   */
+  private void validateTemporaryWorkspace(DrillbitContext context) throws Exception {
+    try (SchemaTreeProvider schemaTreeProvider = new SchemaTreeProvider(context)) {
+      final SchemaPlus rootSchema = schemaTreeProvider.createRootSchema(context.getOptionManager());
+      SchemaUtilites.getTemporaryWorkspace(rootSchema, context.getConfig());
+    }
+  }
+
+  /**
    * Shutdown hook for Drillbit. Closes the drillbit, and reports on errors that
    * occur during closure, as well as the location the drillbit was started from.
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 7a16d0a..618841b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -117,18 +117,33 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
   }
 
   /**
+   * Creates table entry using table name, list of partition columns
+   * and storage strategy used to create table folder and files
    *
    * @param tableName : new table name.
    * @param partitionColumns : list of partition columns. Empty list if there is no partition columns.
-   * @return
+   * @param storageStrategy : storage strategy used to create table folder and files
+   * @return create table entry
    */
-  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
     throw UserException.unsupportedError()
         .message("Creating new tables is not supported in schema [%s]", getSchemaPath())
         .build(logger);
   }
 
   /**
+   * Creates table entry using table name and list of partition columns if any.
+   * Table folder and files will be created using persistent storage strategy.
+   *
+   * @param tableName : new table name.
+   * @param partitionColumns : list of partition columns. Empty list if there is no partition columns.
+   * @return create table entry
+   */
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
+    return createNewTable(tableName, partitionColumns, StorageStrategy.PERSISTENT);
+  }
+
+  /**
    * Reports whether to show items from this schema in INFORMATION_SCHEMA
    * tables.
    * (Controls ... TODO:  Doc.:  Mention what this typically controls or

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
index d05cc43..4f426bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,10 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ViewExpansionContext;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider;
 import org.apache.drill.exec.util.ImpersonationUtil;
 
@@ -49,6 +52,33 @@ public class SchemaTreeProvider implements AutoCloseable {
   }
 
   /**
+   * Return root schema for process user.
+   *
+   * @param options list of options
+   * @return root of the schema tree
+   */
+  public SchemaPlus createRootSchema(final OptionManager options) {
+    SchemaConfigInfoProvider schemaConfigInfoProvider = new SchemaConfigInfoProvider() {
+
+      @Override
+      public ViewExpansionContext getViewExpansionContext() {
+        throw new UnsupportedOperationException("View expansion context is not supported");
+      }
+
+      @Override
+      public OptionValue getOption(String optionKey) {
+        return options.getOption(optionKey);
+      }
+    };
+
+    final SchemaConfig schemaConfig = SchemaConfig.newBuilder(
+        ImpersonationUtil.getProcessUserName(), schemaConfigInfoProvider)
+        .build();
+
+    return createRootSchema(schemaConfig);
+  }
+
+  /**
    * Return root schema with schema owner as the given user.
    *
    * @param userName Name of the user who is accessing the storage sources.


[4/5] drill git commit: DRILL-5104: Foreman should not set external sort memory for a physical plan

Posted by jn...@apache.org.
DRILL-5104: Foreman should not set external sort memory for a physical plan

Physical plans include a plan for memory allocations. However, the code
path in Foreman replans external sort memory, even for a physical plan.
This makes it impossible to use a physical plan to test memory
configuration.

This change avoids changing memory settings in a physical plan; while
preserving the adjustments for logical plans or SQL queries.

Revised to put a property in the plan itself. Old plans, and those
generated from SQL, will have memory allocations applied. Plans
marked as already "resource management" planned will be used as-is.

Includes a unit test that demonstrates the new behavior.

close apache/drill#703


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/82176976
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/82176976
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/82176976

Branch: refs/heads/master
Commit: 8217697647e7ed10c52cfa91b860730302a339e8
Parents: 08ca5e0
Author: Paul Rogers <pr...@maprtech.com>
Authored: Tue Dec 13 14:36:42 2016 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Jan 23 17:08:39 2017 -0800

----------------------------------------------------------------------
 .../apache/drill/exec/physical/PhysicalPlan.java  |  3 ---
 .../exec/util/MemoryAllocationUtilities.java      |  9 ++++++++-
 .../apache/drill/exec/work/foreman/Foreman.java   |  1 +
 .../impl/xsort/TestSimpleExternalSort.java        |  6 ++++--
 .../resources/xsort/one_key_sort_descending.json  |  3 ++-
 .../drill/common/logical/PlanProperties.java      | 18 ++++++++++++++++--
 6 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
index 78b882b..e0902c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
@@ -62,10 +62,8 @@ public class PhysicalPlan {
     }else{
       return list;
     }
-
   }
 
-
   @JsonProperty("head")
   public PlanProperties getProperties() {
     return properties;
@@ -89,5 +87,4 @@ public class PhysicalPlan {
       throw new RuntimeException(e);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index 38dfcd0..678167f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -41,6 +41,13 @@ public class MemoryAllocationUtilities {
    * @param queryContext
    */
   public static void setupSortMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
+
+    // Test plans may already have a pre-defined memory plan.
+    // Otherwise, determine memory allocation.
+
+    if (plan.getProperties().hasResourcePlan) {
+      return;
+    }
     // look for external sorts
     final List<ExternalSort> sortList = new LinkedList<>();
     for (final PhysicalOperator op : plan.getSortedOperators()) {
@@ -64,6 +71,6 @@ public class MemoryAllocationUtilities {
         externalSort.setMaxAllocation(maxSortAlloc);
       }
     }
+    plan.getProperties().hasResourcePlan = true;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index c6a3104..30718b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -774,6 +774,7 @@ public class Foreman implements Runnable {
       }
     }
 
+    @SuppressWarnings("resource")
     @Override
     public void close() {
       Preconditions.checkState(!isClosed);

http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index b34a466..85975cb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -42,7 +42,6 @@ import org.junit.rules.TestRule;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 
-@Ignore
 public class TestSimpleExternalSort extends BaseTestQuery {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleExternalSort.class);
   DrillConfig c = DrillConfig.create();
@@ -50,6 +49,7 @@ public class TestSimpleExternalSort extends BaseTestQuery {
 
   @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000);
 
+  @Ignore
   @Test
   public void mergeSortWithSv2() throws Exception {
     List<QueryDataBatch> results = testPhysicalFromFileWithResults("xsort/one_key_sort_descending_sv2.json");
@@ -109,7 +109,7 @@ public class TestSimpleExternalSort extends BaseTestQuery {
 
     for (QueryDataBatch b : results) {
       if (b.getHeader().getRowCount() == 0) {
-        break;
+        continue;
       }
       batchCount++;
       RecordBatchLoader loader = new RecordBatchLoader(allocator);
@@ -132,6 +132,7 @@ public class TestSimpleExternalSort extends BaseTestQuery {
   }
 
   @Test
+  @Ignore
   public void sortOneKeyDescendingExternalSort() throws Throwable{
     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
 
@@ -186,6 +187,7 @@ public class TestSimpleExternalSort extends BaseTestQuery {
   }
 
   @Test
+  @Ignore
   public void outOfMemoryExternalSort() throws Throwable{
     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/exec/java-exec/src/test/resources/xsort/one_key_sort_descending.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/xsort/one_key_sort_descending.json b/exec/java-exec/src/test/resources/xsort/one_key_sort_descending.json
index f4eab5d..b4794ad 100644
--- a/exec/java-exec/src/test/resources/xsort/one_key_sort_descending.json
+++ b/exec/java-exec/src/test/resources/xsort/one_key_sort_descending.json
@@ -4,7 +4,8 @@
         version:"1",
         generator:{
             type:"manual"
-        }
+        },
+        hasResourcePlan: true
     },
     graph:[
         {

http://git-wip-us.apache.org/repos/asf/drill/blob/82176976/logical/src/main/java/org/apache/drill/common/logical/PlanProperties.java
----------------------------------------------------------------------
diff --git a/logical/src/main/java/org/apache/drill/common/logical/PlanProperties.java b/logical/src/main/java/org/apache/drill/common/logical/PlanProperties.java
index ce9603e..f4de0eb 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/PlanProperties.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/PlanProperties.java
@@ -35,6 +35,12 @@ public class PlanProperties {
   public JSONOptions options;
   public int queue;
 
+  /**
+   * Indicates if the plan has been planned for resource management
+   * (memory, etc.) or if this plan must still be computed.
+   */
+  public boolean hasResourcePlan;
+
 //  @JsonInclude(Include.NON_NULL)
   public static class Generator {
     public String type;
@@ -55,7 +61,8 @@ public class PlanProperties {
                          @JsonProperty("type") PlanType type,
                          @JsonProperty("mode") ResultMode resultMode,
                          @JsonProperty("options") JSONOptions options,
-                         @JsonProperty("queue") int queue
+                         @JsonProperty("queue") int queue,
+                         @JsonProperty("hasResourcePlan") boolean hasResourcePlan
                          ) {
     this.version = version;
     this.queue = queue;
@@ -63,6 +70,7 @@ public class PlanProperties {
     this.type = type;
     this.resultMode = resultMode == null ? ResultMode.EXEC : resultMode;
     this.options = options;
+    this.hasResourcePlan = hasResourcePlan;
   }
 
   public static PlanPropertiesBuilder builder() {
@@ -76,6 +84,7 @@ public class PlanProperties {
     private ResultMode mode = ResultMode.EXEC;
     private JSONOptions options;
     private int queueNumber = 0;
+    private boolean hasResourcePlan = false;
 
     public PlanPropertiesBuilder type(PlanType type) {
       this.type = type;
@@ -112,8 +121,13 @@ public class PlanProperties {
       return this;
     }
 
+    public PlanPropertiesBuilder generator(boolean hasResourcePlan) {
+      this.hasResourcePlan = hasResourcePlan;
+      return this;
+    }
+
     public PlanProperties build() {
-      return new PlanProperties(version, generator, type, mode, options, queueNumber);
+      return new PlanProperties(version, generator, type, mode, options, queueNumber, hasResourcePlan);
     }
 
   }


[3/5] drill git commit: DRILL-5097: Using store.parquet.reader.int96_as_timestamp gives IOOB whereas convert_from works

Posted by jn...@apache.org.
DRILL-5097: Using store.parquet.reader.int96_as_timestamp gives IOOB whereas convert_from works

close apache/drill#697


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/08ca5e09
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/08ca5e09
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/08ca5e09

Branch: refs/heads/master
Commit: 08ca5e0923436ebeb3e190c60892167d9beab0c2
Parents: bb29f19
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Wed Dec 14 16:24:08 2016 +0000
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Jan 23 17:08:20 2017 -0800

----------------------------------------------------------------------
 .../NullableFixedByteAlignedReaders.java        |  10 +++++++++-
 .../physical/impl/writer/TestParquetWriter.java |  19 +++++++++++++++++++
 .../test/resources/parquet/data.snappy.parquet  | Bin 0 -> 59200 bytes
 3 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/08ca5e09/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index e20504f..b233a65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -110,9 +110,14 @@ public class NullableFixedByteAlignedReaders {
 
   /**
    * Class for reading parquet fixed binary type INT96, which is used for storing hive,
-   * impala timestamp values with nanoseconds precision. So it reads such values as a drill timestamp.
+   * impala timestamp values with nanoseconds precision (12 bytes). So it reads such values as a drill timestamp (8 bytes).
    */
   static class NullableFixedBinaryAsTimeStampReader extends NullableFixedByteAlignedReader<NullableTimeStampVector> {
+    /**
+     * The width of each element of the TimeStampVector is 8 byte(s).
+     */
+    private static final int TIMESTAMP_LENGTH_IN_BITS = 64;
+
     NullableFixedBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
                               ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeStampVector v, SchemaElement schemaElement) throws ExecutionSetupException {
       super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
@@ -132,6 +137,9 @@ public class NullableFixedByteAlignedReaders {
           valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue));
         }
       }
+      // The nanos precision is cut to millis. Therefore the length of single timestamp value is 8 bytes(s)
+      // instead of 12 byte(s).
+      dataTypeLengthInBits = TIMESTAMP_LENGTH_IN_BITS;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/08ca5e09/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index ae0e699..362c943 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.math.BigDecimal;
 import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -963,5 +964,23 @@ public class TestParquetWriter extends BaseTestQuery {
     }
   }
 
+  @Test // DRILL-5097
+  public void testInt96TimeStampValueWidth() throws Exception {
+    try {
+      testBuilder()
+          .ordered()
+          .sqlQuery("select c, d from cp.`parquet/data.snappy.parquet` where d = '2015-07-18 13:52:51'")
+          .optionSettingQueriesForTestQuery(
+              "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP)
+          .baselineColumns("c", "d")
+          .baselineValues(new DateTime(Date.valueOf("2011-04-11").getTime()),
+              new DateTime(Timestamp.valueOf("2015-07-18 13:52:51").getTime()))
+          .build()
+          .run();
+    } finally {
+      test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/08ca5e09/exec/java-exec/src/test/resources/parquet/data.snappy.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/data.snappy.parquet b/exec/java-exec/src/test/resources/parquet/data.snappy.parquet
new file mode 100644
index 0000000..1ce3d75
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/data.snappy.parquet differ