You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/02/02 03:35:57 UTC

[2/2] drill git commit: DRILL-4185: UNION ALL involving empty directory on any side of union all results in Failed query

DRILL-4185: UNION ALL involving empty directory on any side of union all results in Failed query

closes #1083


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f3020081
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f3020081
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f3020081

Branch: refs/heads/master
Commit: f30200812eb27c76b8d4d246008cbd9bd59fb0a5
Parents: 07dae3c
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Fri Dec 1 22:48:05 2017 +0200
Committer: Ben-Zvi <bb...@mapr.com>
Committed: Thu Feb 1 18:05:34 2018 -0800

----------------------------------------------------------------------
 .../physical/base/SchemalessBatchCreator.java   |  40 +++
 .../exec/physical/base/SchemalessScan.java      |  94 ++++++
 .../physical/impl/join/NestedLoopJoinBatch.java |  86 ++---
 .../drill/exec/planner/logical/DrillTable.java  |   8 +-
 .../exec/planner/physical/UnionAllPrule.java    |   3 +-
 .../sql/handlers/RefreshMetadataHandler.java    |  14 +-
 .../drill/exec/record/SchemalessBatch.java      | 108 +++++++
 .../drill/exec/store/dfs/FileSelection.java     |  21 ++
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |   5 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |  12 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  23 +-
 .../java/org/apache/drill/TestJoinNullable.java |   2 +-
 .../java/org/apache/drill/TestUnionAll.java     | 136 ++++++--
 .../org/apache/drill/TestUnionDistinct.java     |  76 +++++
 .../apache/drill/exec/TestEmptyInputSql.java    |  43 ++-
 .../exec/physical/impl/join/JoinTestBase.java   |  61 +++-
 .../impl/join/TestHashJoinAdvanced.java         |  17 +-
 .../impl/join/TestJoinEmptyDirTable.java        | 321 +++++++++++++++++++
 .../impl/join/TestMergeJoinAdvanced.java        |  25 +-
 .../physical/impl/join/TestNestedLoopJoin.java  |  42 ++-
 .../drill/exec/store/dfs/TestFileSelection.java |  18 +-
 .../store/dfs/TestSchemaNotFoundException.java  |   4 +-
 .../store/parquet/TestParquetGroupScan.java     |  64 ++--
 .../store/parquet/TestParquetMetadataCache.java |  60 +++-
 .../apache/drill/test/BaseDirTestWatcher.java   |   2 +-
 .../apache/drill/test/rowSet/SchemaBuilder.java |   9 +-
 26 files changed, 1069 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessBatchCreator.java
new file mode 100644
index 0000000..0fb8f45
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessBatchCreator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.SchemalessBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+/**
+ * The operator for creating {@link SchemalessBatch} instances
+ */
+public class SchemalessBatchCreator implements BatchCreator<SchemalessScan> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, SchemalessScan subScan, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    return new SchemalessBatch();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java
new file mode 100644
index 0000000..e0db1ae
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.proto.CoordinationProtos;
+
+import java.util.List;
+
+/**
+ *  The type of scan operator, which allows to scan schemaless tables ({@link DynamicDrillTable} with null selection)
+ */
+@JsonTypeName("schemaless-scan")
+public class SchemalessScan extends AbstractFileGroupScan implements SubScan {
+
+  private final String selectionRoot;
+
+  public SchemalessScan(String userName, String selectionRoot) {
+    super(userName);
+    this.selectionRoot = selectionRoot;
+  }
+
+  public SchemalessScan(final SchemalessScan that) {
+    super(that);
+    this.selectionRoot = that.selectionRoot;
+  }
+
+  @Override
+  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+    return this;
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public String toString() {
+    final String pattern = "SchemalessScan [selectionRoot = %s]";
+    return String.format(pattern, selectionRoot);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    assert children == null || children.isEmpty();
+    return new SchemalessScan(this);
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return this;
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    return ScanStats.ZERO_RECORD_TABLE;
+  }
+
+  @Override
+  public boolean supportsPartitionFilterPushdown() {
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index fa8c13a..9fc734d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -146,7 +146,7 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
         return IterOutcome.NONE;
       }
 
-      boolean drainRight = true;
+      boolean drainRight = rightUpstream != IterOutcome.NONE ? true : false;
       while (drainRight) {
         rightUpstream = next(RIGHT_INPUT, right);
         switch (rightUpstream) {
@@ -267,22 +267,24 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
 
     int fieldId = 0;
     int outputFieldId = 0;
-    // Set the input and output value vector references corresponding to the left batch
-    for (MaterializedField field : leftSchema) {
-      final TypeProtos.MajorType fieldType = field.getType();
-
-      // Add the vector to the output container
-      container.addOrGet(field);
-
-      JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch",
-          new TypedFieldId(fieldType, false, fieldId));
-      JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
-          new TypedFieldId(fieldType, false, outputFieldId));
-
-      nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV));
-      nLJClassGenerator.rotateBlock();
-      fieldId++;
-      outputFieldId++;
+    if (leftSchema != null) {
+      // Set the input and output value vector references corresponding to the left batch
+      for (MaterializedField field : leftSchema) {
+        final TypeProtos.MajorType fieldType = field.getType();
+
+        // Add the vector to the output container
+        container.addOrGet(field);
+
+        JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch",
+            new TypedFieldId(fieldType, false, fieldId));
+        JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
+            new TypedFieldId(fieldType, false, outputFieldId));
+
+        nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV));
+        nLJClassGenerator.rotateBlock();
+        fieldId++;
+        outputFieldId++;
+      }
     }
 
     // generate emitRight
@@ -291,32 +293,34 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
     JExpression batchIndex = JExpr.direct("batchIndex");
     JExpression recordIndexWithinBatch = JExpr.direct("recordIndexWithinBatch");
 
-    // Set the input and output value vector references corresponding to the right batch
-    for (MaterializedField field : rightSchema) {
+    if (rightSchema != null) {
+      // Set the input and output value vector references corresponding to the right batch
+      for (MaterializedField field : rightSchema) {
+
+        final TypeProtos.MajorType inputType = field.getType();
+        TypeProtos.MajorType outputType;
+        // if join type is LEFT, make sure right batch output fields data mode is optional
+        if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
+        }
 
-      final TypeProtos.MajorType inputType = field.getType();
-      TypeProtos.MajorType outputType;
-      // if join type is LEFT, make sure right batch output fields data mode is optional
-      if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED) {
-        outputType = Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL);
-      } else {
-        outputType = inputType;
+        MaterializedField newField = MaterializedField.create(field.getName(), outputType);
+        container.addOrGet(newField);
+
+        JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("rightContainer",
+            new TypedFieldId(inputType, true, fieldId));
+        JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
+            new TypedFieldId(outputType, false, outputFieldId));
+        nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe")
+            .arg(recordIndexWithinBatch)
+            .arg(outIndex)
+            .arg(inVV.component(batchIndex)));
+        nLJClassGenerator.rotateBlock();
+        fieldId++;
+        outputFieldId++;
       }
-
-      MaterializedField newField = MaterializedField.create(field.getName(), outputType);
-      container.addOrGet(newField);
-
-      JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("rightContainer",
-          new TypedFieldId(inputType, true, fieldId));
-      JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
-          new TypedFieldId(outputType, false, outputFieldId));
-      nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe")
-          .arg(recordIndexWithinBatch)
-          .arg(outIndex)
-          .arg(inVV.component(batchIndex)));
-      nLJClassGenerator.rotateBlock();
-      fieldId++;
-      outputFieldId++;
     }
 
     return context.getImplementationClass(nLJCodeGenerator);

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 9a0d369..4ee7671 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -30,8 +30,10 @@ import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.SchemalessScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.util.ImpersonationUtil;
 
 public abstract class DrillTable implements Table {
@@ -85,7 +87,11 @@ public abstract class DrillTable implements Table {
 
   public GroupScan getGroupScan() throws IOException{
     if (scan == null) {
-      this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection));
+      if (selection instanceof FileSelection && ((FileSelection) selection).isEmptyDirectory()) {
+        this.scan = new SchemalessScan(userName, ((FileSelection) selection).getSelectionRoot());
+      } else {
+        this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection));
+      }
     }
     return scan;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
index 336ab3a..7ea0202 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
@@ -40,8 +40,7 @@ public class UnionAllPrule extends Prule {
   protected static final Logger tracer = CalciteTrace.getPlannerTracer();
 
   private UnionAllPrule() {
-    super(
-        RelOptHelper.any(DrillUnionRel.class), "Prel.UnionAllPrule");
+    super(RelOptHelper.any(DrillUnionRel.class), "Prel.UnionAllPrule");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
index b36356a..6bfceb4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -33,6 +33,7 @@ import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
@@ -78,11 +79,11 @@ public class RefreshMetadataHandler extends DefaultSqlHandler {
 
       final Table table = schema.getTable(tableName);
 
-      if(table == null){
+      if (table == null) {
         return direct(false, "Table %s does not exist.", tableName);
       }
 
-      if(! (table instanceof DrillTable) ){
+      if (!(table instanceof DrillTable)) {
         return notSupported(tableName);
       }
 
@@ -90,7 +91,12 @@ public class RefreshMetadataHandler extends DefaultSqlHandler {
       final DrillTable drillTable = (DrillTable) table;
 
       final Object selection = drillTable.getSelection();
-      if( !(selection instanceof FormatSelection) ){
+
+      if (selection instanceof FileSelection && ((FileSelection) selection).isEmptyDirectory()) {
+        return direct(false, "Table %s is empty and doesn't contain any parquet files.", tableName);
+      }
+
+      if (!(selection instanceof FormatSelection)) {
         return notSupported(tableName);
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
new file mode 100644
index 0000000..ff0cfaf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import java.util.Iterator;
+
+/**
+ * Empty batch without schema and data.
+ */
+public class SchemalessBatch implements CloseableRecordBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemalessBatch.class);
+
+  public SchemalessBatch() {
+    logger.debug("Empty schemaless batch is created");
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return null;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return null;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return 0;
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException(String.format("You should not call getSelectionVector2() for class %s",
+        this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException(String.format("You should not call getSelectionVector4() for class %s",
+        this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public void kill(boolean sendUpstream) {
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() {
+    throw new UnsupportedOperationException(String.format("You should not call getOutgoingContainer() for class %s",
+        this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    throw new UnsupportedOperationException(String.format("You should not call getValueVectorId() for class %s",
+        this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    throw new UnsupportedOperationException(String.format("You should not call getValueAccessorById() for class %s",
+        this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public IterOutcome next() {
+    return IterOutcome.NONE;
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    throw new UnsupportedOperationException(String.format("You should not call getWritableBatch() for class %s",
+        this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    // This is present to match BatchCreator#getBatch() returning type.
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 7fa981b..7edb327 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -56,6 +56,11 @@ public class FileSelection {
    */
   private MetadataContext metaContext = null;
 
+  /**
+   * Indicates whether this selectionRoot is an empty directory
+   */
+  private boolean emptyDirectory;
+
   private enum StatusType {
     NOT_CHECKED,         // initial state
     NO_DIRS,             // no directories in this selection
@@ -424,6 +429,22 @@ public class FileSelection {
     return metaContext;
   }
 
+  /**
+   * @return true if this {@link FileSelection#selectionRoot} points to an empty directory, false otherwise
+   */
+  public boolean isEmptyDirectory() {
+    return emptyDirectory;
+  }
+
+  /**
+   * Setting {@link FileSelection#emptyDirectory} as true allows to identify this {@link FileSelection#selectionRoot}
+   * as an empty directory
+   */
+  public void setEmptyDirectoryStatus() {
+    this.emptyDirectory = true;
+  }
+
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index a3886bb..7640c13 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -69,7 +69,6 @@ import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -632,7 +631,9 @@ public class WorkspaceSchemaFactory {
 
         final FileSelection newSelection = hasDirectories ? fileSelection.minusDirectories(getFS()) : fileSelection;
         if (newSelection == null) {
-          return null;
+          // empty directory / selection means that this is the empty and schemaless table
+          fileSelection.setEmptyDirectoryStatus();
+          return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), fileSelection);
         }
 
         for (final FormatMatcher matcher : fileMatchers) {

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index e457e18..e48239b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -29,8 +29,10 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.SchemalessScan;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
@@ -165,9 +167,15 @@ public class ParquetFormatPlugin implements FormatPlugin{
   }
 
   @Override
-  public ParquetGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+  public AbstractFileGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
       throws IOException {
-    return new ParquetGroupScan(userName, selection, this, columns);
+    ParquetGroupScan parquetGroupScan = new ParquetGroupScan(userName, selection, this, columns);
+    if (parquetGroupScan.getEntries().isEmpty()) {
+      // If ParquetGroupScan does not contain any entries, it means selection directories are empty and
+      // metadata cache files are invalid, return schemaless scan
+      return new SchemalessScan(userName, parquetGroupScan.getSelectionRoot());
+    }
+    return parquetGroupScan;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 75b18df..aa001f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -210,18 +210,20 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     this.entries = Lists.newArrayList();
 
-    if (checkForInitializingEntriesWithSelectionRoot()) {
-      // The fully expanded list is already stored as part of the fileSet
-      entries.add(new ReadEntryWithPath(fileSelection.getSelectionRoot()));
-    } else {
-      for (String fileName : fileSelection.getFiles()) {
-        entries.add(new ReadEntryWithPath(fileName));
+    if (fileSelection != null) {
+      if (checkForInitializingEntriesWithSelectionRoot()) {
+        // The fully expanded list is already stored as part of the fileSet
+        entries.add(new ReadEntryWithPath(fileSelection.getSelectionRoot()));
+      } else {
+        for (String fileName : fileSelection.getFiles()) {
+          entries.add(new ReadEntryWithPath(fileName));
+        }
       }
-    }
 
-    this.filter = filter;
+      this.filter = filter;
 
-    init();
+      init();
+    }
   }
 
   /*
@@ -875,7 +877,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     if (fileSet.isEmpty()) {
       // no files were found, most likely we tried to query some empty sub folders
-      throw UserException.validationError().message("The table you tried to query is empty").build(logger);
+      logger.warn("The table is empty but with outdated invalid metadata cache files. Please, delete them.");
+      return null;
     }
 
     List<String> fileNames = Lists.newArrayList(fileSet);

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java b/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
index 89a9e9d..949acf3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 655d036..8f954bc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -26,9 +26,11 @@ import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
 import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.rowSet.SchemaBuilder;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -47,9 +49,12 @@ public class TestUnionAll extends BaseTestQuery {
   private static final String enableDistribute = "alter session set `planner.enable_unionall_distribute` = true";
   private static final String defaultDistribute = "alter session reset `planner.enable_unionall_distribute`";
 
+  private static final String EMPTY_DIR_NAME = "empty_directory";
+
   @BeforeClass
   public static void setupTestFiles() {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "parquet"));
+    dirTestWatcher.makeTestTmpSubDir(Paths.get(EMPTY_DIR_NAME));
   }
 
   @Test  // Simple Union-All over two scans
@@ -209,19 +214,19 @@ public class TestUnionAll extends BaseTestQuery {
 
   @Test
   public void testUnionAllViewExpandableStar() throws Exception {
-    test("use dfs.tmp");
-    test("create view nation_view_testunionall as select n_name, n_nationkey from cp.`tpch/nation.parquet`;");
-    test("create view region_view_testunionall as select r_name, r_regionkey from cp.`tpch/region.parquet`;");
+    try {
+      test("use dfs.tmp");
+      test("create view nation_view_testunionall_expandable_star as select n_name, n_nationkey from cp.`tpch/nation.parquet`;");
+      test("create view region_view_testunionall_expandable_star as select r_name, r_regionkey from cp.`tpch/region.parquet`;");
 
-    String query1 = "(select * from dfs.tmp.`nation_view_testunionall`) " +
-                    "union all " +
-                    "(select * from dfs.tmp.`region_view_testunionall`) ";
+      String query1 = "(select * from dfs.tmp.`nation_view_testunionall_expandable_star`) " +
+          "union all " +
+          "(select * from dfs.tmp.`region_view_testunionall_expandable_star`) ";
 
-    String query2 =  "(select r_name, r_regionkey from cp.`tpch/region.parquet`) " +
-                     "union all " +
-                     "(select * from dfs.tmp.`nation_view_testunionall`)";
+      String query2 =  "(select r_name, r_regionkey from cp.`tpch/region.parquet`) " +
+          "union all " +
+          "(select * from dfs.tmp.`nation_view_testunionall_expandable_star`)";
 
-    try {
       testBuilder()
           .sqlQuery(query1)
           .unOrdered()
@@ -238,42 +243,42 @@ public class TestUnionAll extends BaseTestQuery {
           .baselineColumns("r_name", "r_regionkey")
           .build().run();
     } finally {
-      test("drop view nation_view_testunionall");
-      test("drop view region_view_testunionall");
+      test("drop view if exists nation_view_testunionall_expandable_star");
+      test("drop view if exists region_view_testunionall_expandable_star");
     }
   }
 
   @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-2002
   public void testUnionAllViewUnExpandableStar() throws Exception {
-    test("use dfs.tmp");
-    test("create view nation_view_testunionall as select * from cp.`tpch/nation.parquet`;");
-
     try {
-      String query = "(select * from dfs.tmp.`nation_view_testunionall`) " +
+      test("use dfs.tmp");
+      test("create view nation_view_testunionall_expandable_star as select * from cp.`tpch/nation.parquet`;");
+
+      String query = "(select * from dfs.tmp.`nation_view_testunionall_expandable_star`) " +
                      "union all (select * from cp.`tpch/region.parquet`)";
       test(query);
     } catch(UserException ex) {
       SqlUnsupportedException.errorClassNameToException(ex.getOrCreatePBError(false).getException().getExceptionClass());
       throw ex;
     } finally {
-      test("drop view nation_view_testunionall");
+      test("drop view if exists nation_view_testunionall_expandable_star");
     }
   }
 
   @Test
   public void testDiffDataTypesAndModes() throws Exception {
-    test("use dfs.tmp");
-    test("create view nation_view_testunionall as select n_name, n_nationkey from cp.`tpch/nation.parquet`;");
-    test("create view region_view_testunionall as select r_name, r_regionkey from cp.`tpch/region.parquet`;");
+    try {
+      test("use dfs.tmp");
+      test("create view nation_view_testunionall_expandable_star as select n_name, n_nationkey from cp.`tpch/nation.parquet`;");
+      test("create view region_view_testunionall_expandable_star as select r_name, r_regionkey from cp.`tpch/region.parquet`;");
 
-    String t1 = "(select n_comment, n_regionkey from cp.`tpch/nation.parquet` limit 5)";
-    String t2 = "(select * from nation_view_testunionall  limit 5)";
-    String t3 = "(select full_name, store_id from cp.`employee.json` limit 5)";
-    String t4 = "(select * from region_view_testunionall  limit 5)";
+      String t1 = "(select n_comment, n_regionkey from cp.`tpch/nation.parquet` limit 5)";
+      String t2 = "(select * from nation_view_testunionall_expandable_star  limit 5)";
+      String t3 = "(select full_name, store_id from cp.`employee.json` limit 5)";
+      String t4 = "(select * from region_view_testunionall_expandable_star  limit 5)";
 
-    String query1 = t1 + " union all " + t2 + " union all " + t3 + " union all " + t4;
+      String query1 = t1 + " union all " + t2 + " union all " + t3 + " union all " + t4;
 
-    try {
       testBuilder()
           .sqlQuery(query1)
           .unOrdered()
@@ -282,8 +287,8 @@ public class TestUnionAll extends BaseTestQuery {
           .baselineColumns("n_comment", "n_regionkey")
           .build().run();
     } finally {
-      test("drop view nation_view_testunionall");
-      test("drop view region_view_testunionall");
+      test("drop view if exists nation_view_testunionall_expandable_star");
+      test("drop view if exists region_view_testunionall_expandable_star");
     }
   }
 
@@ -1197,4 +1202,77 @@ public class TestUnionAll extends BaseTestQuery {
       .baselineValues("1", "2", "1", null, "a")
       .go();
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testUnionAllRightEmptyDir() throws Exception {
+    String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+        .sqlQuery("SELECT key FROM cp.`%s` UNION ALL SELECT key FROM dfs.tmp.`%s`",
+            rootSimple, EMPTY_DIR_NAME)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionAllLeftEmptyDir() throws Exception {
+    final String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+        .sqlQuery("SELECT key FROM dfs.tmp.`%s` UNION ALL SELECT key FROM cp.`%s`",
+            EMPTY_DIR_NAME, rootSimple)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionAllBothEmptyDirs() throws Exception {
+    final BatchSchema expectedSchema = new SchemaBuilder().build();
+
+    testBuilder()
+        .sqlQuery("SELECT key FROM dfs.tmp.`%1$s` UNION ALL SELECT key FROM dfs.tmp.`%1$s`", EMPTY_DIR_NAME)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionAllMiddleEmptyDir() throws Exception {
+    final String query = "SELECT n_regionkey FROM cp.`tpch/nation.parquet` UNION ALL " +
+        "SELECT missing_key FROM dfs.tmp.`%s` UNION ALL SELECT r_regionkey FROM cp.`tpch/region.parquet`";
+
+    testBuilder()
+        .sqlQuery(query, EMPTY_DIR_NAME)
+        .unOrdered()
+        .csvBaselineFile("testframework/testUnionAllQueries/q1.tsv")
+        .baselineTypes(TypeProtos.MinorType.INT)
+        .baselineColumns("n_regionkey")
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testComplexQueryWithUnionAllAndEmptyDir() throws Exception {
+    final String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+        .sqlQuery("SELECT key FROM dfs.tmp.`%1$s` UNION ALL SELECT key FROM " +
+            "(SELECT key FROM dfs.tmp.`%1$s` UNION ALL SELECT key FROM cp.`%2$s`)",
+            EMPTY_DIR_NAME, rootSimple)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build()
+        .run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
index 4b8140e..e6cf842 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
@@ -26,9 +26,11 @@ import org.apache.drill.categories.SqlTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
 import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.rowSet.SchemaBuilder;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -42,9 +44,12 @@ public class TestUnionDistinct extends BaseTestQuery {
   private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1";
   private static final String sliceTargetDefault = "alter session reset `planner.slice_target`";
 
+  private static final String EMPTY_DIR_NAME = "empty_directory";
+
   @BeforeClass
   public static void setupFiles() {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
+    dirTestWatcher.makeTestTmpSubDir(Paths.get(EMPTY_DIR_NAME));
   }
 
   @Test  // Simple Union over two scans
@@ -789,4 +794,75 @@ public class TestUnionDistinct extends BaseTestQuery {
     }
   }
 
+
+  @Test
+  public void testUnionRightEmptyDir() throws Exception {
+    String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+        .sqlQuery("SELECT key FROM cp.`%s` UNION SELECT key FROM dfs.tmp.`%s`",
+            rootSimple, EMPTY_DIR_NAME)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build().run();
+  }
+
+  @Test
+  public void testUnionLeftEmptyDir() throws Exception {
+    final String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+        .sqlQuery("SELECT key FROM dfs.tmp.`%s` UNION SELECT key FROM cp.`%s`",
+            EMPTY_DIR_NAME, rootSimple)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionBothEmptyDirs() throws Exception {
+    final BatchSchema expectedSchema = new SchemaBuilder().build();
+
+    testBuilder()
+        .sqlQuery("SELECT key FROM dfs.tmp.`%1$s` UNION SELECT key FROM dfs.tmp.`%1$s`", EMPTY_DIR_NAME)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionMiddleEmptyDir() throws Exception {
+    final String query = "SELECT n_regionkey FROM cp.`tpch/nation.parquet` UNION " +
+        "SELECT missing_key FROM dfs.tmp.`%s` UNION SELECT r_regionkey FROM cp.`tpch/region.parquet`";
+
+    testBuilder()
+        .sqlQuery(query, EMPTY_DIR_NAME)
+        .unOrdered()
+        .csvBaselineFile("testframework/unionDistinct/q1.tsv")
+        .baselineTypes(TypeProtos.MinorType.INT)
+        .baselineColumns("n_regionkey")
+        .build().run();
+  }
+
+  @Test
+  public void testComplexQueryWithUnionAndEmptyDir() throws Exception {
+    final String rootSimple = "/store/json/booleanData.json";
+
+    testBuilder()
+        .sqlQuery("SELECT key FROM dfs.tmp.`%1$s` UNION SELECT key FROM " +
+                "(SELECT key FROM dfs.tmp.`%1$s` UNION SELECT key FROM cp.`%2$s`)",
+            EMPTY_DIR_NAME, rootSimple)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build()
+        .run();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
index b691d94..d06151c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
@@ -26,17 +26,25 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.nio.file.Paths;
 import java.util.List;
 
 @Category(UnlikelyTest.class)
 public class TestEmptyInputSql extends BaseTestQuery {
 
-  public final String SINGLE_EMPTY_JSON = "/scan/emptyInput/emptyJson/empty.json";
-  public final String SINGLE_EMPTY_CSVH = "/scan/emptyInput/emptyCsvH/empty.csvh";
-  public final String SINGLE_EMPTY_CSV = "/scan/emptyInput/emptyCsv/empty.csv";
+  private static final String SINGLE_EMPTY_JSON = "/scan/emptyInput/emptyJson/empty.json";
+  private static final String SINGLE_EMPTY_CSVH = "/scan/emptyInput/emptyCsvH/empty.csvh";
+  private static final String SINGLE_EMPTY_CSV = "/scan/emptyInput/emptyCsv/empty.csv";
+  private static final String EMPTY_DIR_NAME = "empty_directory";
+
+  @BeforeClass
+  public static void setupTestFiles() {
+    dirTestWatcher.makeTestTmpSubDir(Paths.get(EMPTY_DIR_NAME));
+  }
 
   /**
    * Test with query against an empty file. Select clause has regular column reference, and an expression.
@@ -177,4 +185,33 @@ public class TestEmptyInputSql extends BaseTestQuery {
         .run();
   }
 
+  @Test
+  public void testEmptyDirectory() throws Exception {
+    final BatchSchema expectedSchema = new SchemaBuilder().build();
+
+    testBuilder()
+        .sqlQuery("select * from dfs.tmp.`%s`", EMPTY_DIR_NAME)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testEmptyDirectoryAndFieldInQuery() throws Exception {
+    final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
+    final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+        .setMinorType(TypeProtos.MinorType.INT) // field "key" is absent in schemaless table
+        .setMode(TypeProtos.DataMode.OPTIONAL)
+        .build();
+    expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));
+
+    testBuilder()
+        .sqlQuery("select key from dfs.tmp.`%s`", EMPTY_DIR_NAME)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/JoinTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/JoinTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/JoinTestBase.java
index 6d55a3b..3c6df29 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/JoinTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/JoinTestBase.java
@@ -25,11 +25,40 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+
 
 @Category(OperatorTest.class)
 public class JoinTestBase extends PlanTestBase {
 
-  private static final String testEmptyJoin = "select count(*) as cnt from cp.`employee.json` emp %s join dfs.`dept.json` " +
+  public static final String HJ_PATTERN = "HashJoin";
+  public static final String MJ_PATTERN = "MergeJoin";
+  public static final String NLJ_PATTERN = "NestedLoopJoin";
+  public static final String INNER_JOIN_TYPE = "joinType=\\[inner\\]";
+  public static final String LEFT_JOIN_TYPE = "joinType=\\[left\\]";
+  public static final String RIGHT_JOIN_TYPE = "joinType=\\[right\\]";
+
+  public static final String DISABLE_HJ =
+      String.format("alter session set `%s` = false", PlannerSettings.HASHJOIN.getOptionName());
+  public static final String ENABLE_HJ =
+      String.format("alter session set `%s` = true", PlannerSettings.HASHJOIN.getOptionName());
+  public static final String RESET_HJ =
+      String.format("alter session reset `%s`", PlannerSettings.HASHJOIN.getOptionName());
+  public static final String DISABLE_MJ =
+      String.format("alter session set `%s` = false", PlannerSettings.MERGEJOIN.getOptionName());
+  public static final String ENABLE_MJ =
+      String.format("alter session set `%s` = true", PlannerSettings.MERGEJOIN.getOptionName());
+  public static final String DISABLE_NLJ_SCALAR =
+      String.format("alter session set `%s` = false", PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName());
+  public static final String ENABLE_NLJ_SCALAR =
+      String.format("alter session set `%s` = true", PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName());
+  public static final String DISABLE_JOIN_OPTIMIZATION =
+      String.format("alter session set `%s` = false", PlannerSettings.JOIN_OPTIMIZATION.getOptionName());
+  public static final String RESET_JOIN_OPTIMIZATION =
+      String.format("alter session reset `%s`", PlannerSettings.JOIN_OPTIMIZATION.getOptionName());
+
+
+  private static final String TEST_EMPTY_JOIN = "select count(*) as cnt from cp.`employee.json` emp %s join dfs.`dept.json` " +
           "as dept on dept.manager = emp.`last_name`";
 
   /**
@@ -41,10 +70,10 @@ public class JoinTestBase extends PlanTestBase {
    * @param result number of the output rows.
    */
   public void testJoinWithEmptyFile(File testDir, String joinType,
-                         String joinPattern, long result) throws Exception {
+                         String[] joinPattern, long result) throws Exception {
     buildFile("dept.json", new String[0], testDir);
-    String query = String.format(testEmptyJoin, joinType);
-    testPlanMatchingPatterns(query, new String[]{joinPattern}, new String[]{});
+    String query = String.format(TEST_EMPTY_JOIN, joinType);
+    testPlanMatchingPatterns(query, joinPattern, new String[]{});
     testBuilder()
             .sqlQuery(query)
             .unOrdered()
@@ -60,4 +89,28 @@ public class JoinTestBase extends PlanTestBase {
       }
     }
   }
+
+  /**
+   * Allows to enable necessary join operator.
+   * @param hj hash join operator
+   * @param mj merge join operator
+   * @param nlj nested-loop join operator
+   * @throws Exception If any exception is obtained, all set options should be reset
+   */
+  public static void enableJoin(boolean hj, boolean mj, boolean nlj) throws Exception {
+    setSessionOption((PlannerSettings.HASHJOIN.getOptionName()), hj);
+    setSessionOption((PlannerSettings.MERGEJOIN.getOptionName()), mj);
+    setSessionOption((PlannerSettings.NESTEDLOOPJOIN.getOptionName()), nlj);
+    setSessionOption((PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName()), !(nlj));
+  }
+
+  /**
+   * Allows to reset session options of custom join operators
+   */
+  public static void resetJoinOptions() {
+    resetSessionOption((PlannerSettings.HASHJOIN.getOptionName()));
+    resetSessionOption((PlannerSettings.MERGEJOIN.getOptionName()));
+    resetSessionOption((PlannerSettings.NESTEDLOOPJOIN.getOptionName()));
+    resetSessionOption((PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
index 8110476..7219f0e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
@@ -33,18 +33,15 @@ import java.io.FileWriter;
 @Category(OperatorTest.class)
 public class TestHashJoinAdvanced extends JoinTestBase {
 
-  private static final String HJ_PATTERN = "HashJoin";
-
-
   // Have to disable merge join, if this testcase is to test "HASH-JOIN".
   @BeforeClass
   public static void disableMergeJoin() throws Exception {
-    test("alter session set `planner.enable_mergejoin` = false");
+    test(DISABLE_MJ);
   }
 
   @AfterClass
   public static void enableMergeJoin() throws Exception {
-    test("alter session set `planner.enable_mergejoin` = true");
+    test(ENABLE_MJ);
   }
 
   @Test //DRILL-2197 Left Self Join with complex type in projection
@@ -108,7 +105,7 @@ public class TestHashJoinAdvanced extends JoinTestBase {
 
     testBuilder()
         .sqlQuery(query)
-        .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = true")
+        .optionSettingQueriesForTestQuery(ENABLE_HJ)
         .unOrdered()
         .baselineColumns("full_name")
         .baselineValues("Sheri Nowmer")
@@ -122,7 +119,7 @@ public class TestHashJoinAdvanced extends JoinTestBase {
 
     testBuilder()
         .sqlQuery(query)
-        .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = true")
+        .optionSettingQueriesForTestQuery(ENABLE_HJ)
         .unOrdered()
         .baselineColumns("bigint_col")
         .baselineValues(1L)
@@ -165,16 +162,16 @@ public class TestHashJoinAdvanced extends JoinTestBase {
 
   @Test
   public void testHashLeftJoinWithEmptyTable() throws Exception {
-    testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "left outer", HJ_PATTERN, 1155L);
+    testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "left outer", new String[] {HJ_PATTERN, LEFT_JOIN_TYPE}, 1155L);
   }
 
   @Test
   public void testHashInnerJoinWithEmptyTable() throws Exception {
-    testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", HJ_PATTERN, 0L);
+    testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", new String[] {HJ_PATTERN, INNER_JOIN_TYPE}, 0L);
   }
 
   @Test
   public void testHashRightJoinWithEmptyTable() throws Exception {
-    testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", HJ_PATTERN, 0L);
+    testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", new String[] {HJ_PATTERN, RIGHT_JOIN_TYPE}, 0L);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestJoinEmptyDirTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestJoinEmptyDirTable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestJoinEmptyDirTable.java
new file mode 100644
index 0000000..77470d9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestJoinEmptyDirTable.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestJoinEmptyDirTable extends JoinTestBase {
+
+  private static final String EMPTY_DIRECTORY = "empty_directory";
+
+  @BeforeClass
+  public static void setupTestFiles() {
+    dirTestWatcher.makeTestTmpSubDir(Paths.get(EMPTY_DIRECTORY));
+  }
+
+  @Test
+  public void testHashInnerJoinWithLeftEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from dfs.tmp.`%s` t1 inner join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(true, false, false);
+      testPlanMatchingPatterns(query, new String[]{HJ_PATTERN, INNER_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testHashInnerJoinWithRightEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from cp.`employee.json` t1 inner join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(true, false, false);
+      testPlanMatchingPatterns(query, new String[]{HJ_PATTERN, INNER_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testHashInnerJoinWithBothEmptyDirTables() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from dfs.tmp.`%1$s` t1 inner join dfs.tmp.`%1$s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(true, false, false);
+      testPlanMatchingPatterns(query, new String[]{HJ_PATTERN, INNER_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testHashLeftJoinWithRightEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from cp.`employee.json` t1 left join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 1155;
+
+      enableJoin(true, false, false);
+      testPlanMatchingPatterns(query, new String[]{HJ_PATTERN, LEFT_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testHashRightJoinWithRightEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from cp.`employee.json` t1 right join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(true, false, false);
+      testPlanMatchingPatterns(query, new String[]{HJ_PATTERN, RIGHT_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testMergeInnerJoinWithLeftEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from dfs.tmp.`%s` t1 inner join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(false, true, false);
+      testPlanMatchingPatterns(query, new String[]{MJ_PATTERN, INNER_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testMergeInnerJoinWithRightEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from cp.`employee.json` t1 inner join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(false, true, false);
+      testPlanMatchingPatterns(query, new String[]{MJ_PATTERN, INNER_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testMergeInnerJoinWithBothEmptyDirTables() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from dfs.tmp.`%1$s` t1 inner join dfs.tmp.`%1$s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(false, true, false);
+      testPlanMatchingPatterns(query, new String[]{MJ_PATTERN, INNER_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testMergeLeftJoinWithRightEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from cp.`employee.json` t1 left join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 1155;
+
+      enableJoin(false, true, false);
+      testPlanMatchingPatterns(query, new String[]{MJ_PATTERN, LEFT_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testMergeRightJoinWithRightEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from cp.`employee.json` t1 right join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(false, true, false);
+      testPlanMatchingPatterns(query, new String[]{MJ_PATTERN, RIGHT_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testNestedLoopInnerJoinWithLeftEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from dfs.tmp.`%s` t1 inner join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(false, false, true);
+      testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN, INNER_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testNestedLoopInnerJoinWithRightEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from cp.`employee.json` t1 inner join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(false, false, true);
+      testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN, INNER_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testNestedLoopInnerJoinWithBothEmptyDirTables() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from dfs.tmp.`%1$s` t1 inner join dfs.tmp.`%1$s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(false, false, true);
+      testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN, INNER_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testNestedLoopLeftJoinWithLeftEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from dfs.tmp.`%s` t1 left join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 0;
+
+      enableJoin(false, false, true);
+      // See details in description for PlannerSettings.JOIN_OPTIMIZATION
+      setSessionOption((PlannerSettings.JOIN_OPTIMIZATION.getOptionName()), false);
+      testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN, LEFT_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+      resetSessionOption((PlannerSettings.JOIN_OPTIMIZATION.getOptionName()));
+
+    }
+  }
+
+  @Test
+  public void testNestedLoopLeftJoinWithRightEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from cp.`employee.json` t1 left join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 1155;
+
+      enableJoin(false, false, true);
+      testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN, LEFT_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test
+  public void testNestedLoopRightJoinWithLeftEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from dfs.tmp.`%s` t1 right join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+      final int expectedRecordCount = 1155;
+
+      enableJoin(false, false, true);
+      // The left output is less than right one. Therefore during optimization phase the RIGHT JOIN is converted to LEFT JOIN
+      testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN, LEFT_JOIN_TYPE}, new String[]{});
+      final int actualRecordCount = testSql(query);
+      assertEquals("Number of output rows", expectedRecordCount, actualRecordCount);
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testNestedLoopRightJoinWithRightEmptyDirTable() throws Exception {
+    try {
+      String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
+          "from cp.`employee.json` t1 right join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+
+      enableJoin(false, false, true);
+      // The nested loops join does not support the "RIGHT OUTER JOIN" logical join operator.
+      test(query);
+    } catch (UserRemoteException e) {
+      assertTrue("Not expected exception is obtained while performing the query with RIGHT JOIN logical operator " +
+              "by using nested loop join physical operator",
+          e.getMessage().contains("SYSTEM ERROR: CannotPlanException"));
+      throw e;
+    } finally {
+      resetJoinOptions();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
index 488e60a..147323d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -40,7 +40,6 @@ import java.util.Random;
 public class TestMergeJoinAdvanced extends JoinTestBase {
   private static final String LEFT = "merge-join-left.json";
   private static final String RIGHT = "merge-join-right.json";
-  private static final String MJ_PATTERN = "MergeJoin";
 
 
   private static File leftFile;
@@ -52,8 +51,8 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
 
   // Have to disable hash join to test merge join in this class
   @BeforeClass
-  public static void disableMergeJoin() throws Exception {
-    test("alter session set `planner.enable_hashjoin` = false");
+  public static void enableMergeJoin() throws Exception {
+    test(DISABLE_HJ);
 
     leftFile = new File(dirTestWatcher.getRootDir(), LEFT);
     rightFile = new File(dirTestWatcher.getRootDir(), RIGHT);
@@ -62,8 +61,8 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
   }
 
   @AfterClass
-  public static void enableMergeJoin() throws Exception {
-    test("alter session set `planner.enable_hashjoin` = true");
+  public static void disableMergeJoin() throws Exception {
+    test(ENABLE_HJ);
   }
 
   @Test
@@ -73,7 +72,7 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
 
     testBuilder()
         .sqlQuery(query)
-        .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = true")
+        .optionSettingQueriesForTestQuery(ENABLE_HJ)
         .unOrdered()
         .baselineColumns("full_name")
         .baselineValues("Sheri Nowmer")
@@ -87,7 +86,7 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
 
     testBuilder()
         .sqlQuery(query)
-        .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = true")
+        .optionSettingQueriesForTestQuery(ENABLE_HJ)
         .unOrdered()
         .baselineColumns("bigint_col")
         .baselineValues(1l)
@@ -148,7 +147,7 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
       LEFT, joinType, RIGHT);
     testBuilder()
       .sqlQuery(query1)
-      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false")
+      .optionSettingQueriesForTestQuery(DISABLE_HJ)
       .unOrdered()
       .baselineColumns("c1")
       .baselineValues(expected)
@@ -249,7 +248,7 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
       LEFT, "inner", RIGHT);
     testBuilder()
       .sqlQuery(query1)
-      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false")
+      .optionSettingQueriesForTestQuery(DISABLE_HJ)
       .unOrdered()
       .baselineColumns("c1")
       .baselineValues(6000*800L)
@@ -258,16 +257,16 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
 
   @Test
   public void testMergeLeftJoinWithEmptyTable() throws Exception {
-    testJoinWithEmptyFile(dirTestWatcher.getRootDir(),"left outer", MJ_PATTERN, 1155L);
+    testJoinWithEmptyFile(dirTestWatcher.getRootDir(),"left outer", new String[] {MJ_PATTERN, LEFT_JOIN_TYPE}, 1155L);
   }
 
   @Test
   public void testMergeInnerJoinWithEmptyTable() throws Exception {
-    testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", MJ_PATTERN, 0L);
+    testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", new String[] {MJ_PATTERN, INNER_JOIN_TYPE}, 0L);
   }
 
   @Test
   public void testMergeRightJoinWithEmptyTable() throws Exception {
-    testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", MJ_PATTERN, 0L);
+    testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", new String[] {MJ_PATTERN, RIGHT_JOIN_TYPE}, 0L);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
index 092a1a7..6701e57 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
@@ -20,29 +20,18 @@ package org.apache.drill.exec.physical.impl.join;
 
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.rpc.RpcException;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import java.nio.file.Paths;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 @Category(OperatorTest.class)
 public class TestNestedLoopJoin extends JoinTestBase {
 
-  private static final String NLJ_PATTERN = "NestedLoopJoin";
-
-  private static final String DISABLE_HJ = "alter session set `planner.enable_hashjoin` = false";
-  private static final String ENABLE_HJ = "alter session set `planner.enable_hashjoin` = true";
-  private static final String RESET_HJ = "alter session reset `planner.enable_hashjoin`";
-  private static final String DISABLE_MJ = "alter session set `planner.enable_mergejoin` = false";
-  private static final String ENABLE_MJ = "alter session set `planner.enable_mergejoin` = true";
-  private static final String DISABLE_NLJ_SCALAR = "alter session set `planner.enable_nljoin_for_scalar_only` = false";
-  private static final String ENABLE_NLJ_SCALAR = "alter session set `planner.enable_nljoin_for_scalar_only` = true";
-  private static final String DISABLE_JOIN_OPTIMIZATION = "alter session set `planner.enable_join_optimization` = false";
-  private static final String RESET_JOIN_OPTIMIZATION = "alter session reset `planner.enable_join_optimization`";
-
   // Test queries used by planning and execution tests
   private static final String testNlJoinExists_1 = "select r_regionkey from cp.`tpch/region.parquet` "
       + " where exists (select n_regionkey from cp.`tpch/nation.parquet` "
@@ -335,30 +324,35 @@ public class TestNestedLoopJoin extends JoinTestBase {
   @Test
   public void testNestedLeftJoinWithEmptyTable() throws Exception {
     try {
-      alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false);
-      testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "left outer", NLJ_PATTERN, 1155L);
+      enableJoin(false, false, true);
+      testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "left outer", new String[] {NLJ_PATTERN, LEFT_JOIN_TYPE}, 1155L);
     } finally {
-      resetSessionOption(PlannerSettings.HASHJOIN.getOptionName());
+      resetJoinOptions();
     }
   }
 
   @Test
   public void testNestedInnerJoinWithEmptyTable() throws Exception {
     try {
-      alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false);
-      testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", NLJ_PATTERN, 0L);
+      enableJoin(false, false, true);
+      testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", new String[] {NLJ_PATTERN, INNER_JOIN_TYPE}, 0L);
     } finally {
-      resetSessionOption(PlannerSettings.HASHJOIN.getOptionName());
+      resetJoinOptions();
     }
   }
 
-  @Test
-  public void testNestRightJoinWithEmptyTable() throws Exception {
+  @Test(expected = RpcException.class)
+  public void testNestedRightJoinWithEmptyTable() throws Exception {
     try {
-      alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false);
-      testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", NLJ_PATTERN, 0L);
+      enableJoin(false, false, true);
+      testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", new String[] {NLJ_PATTERN, RIGHT_JOIN_TYPE}, 0L);
+    } catch (RpcException e) {
+      assertTrue("Not expected exception is obtained while performing the query with RIGHT JOIN logical operator " +
+          "by using nested loop join physical operator",
+          e.getMessage().contains("SYSTEM ERROR: CannotPlanException"));
+      throw e;
     } finally {
-      resetSessionOption(PlannerSettings.HASHJOIN.getOptionName());
+      resetJoinOptions();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java
index ec3f202..787584d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,9 +19,7 @@ package org.apache.drill.exec.store.dfs;
 
 import static junit.framework.TestCase.fail;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
-import java.nio.file.Paths;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
@@ -48,20 +46,6 @@ public class TestFileSelection extends BaseTestQuery {
     }
   }
 
-  @Test(expected = Exception.class)
-  public void testEmptyFolderThrowsTableNotFound() throws Exception {
-    final String emptyDirPath = dirTestWatcher.makeRootSubDir(Paths.get("empty")).getAbsolutePath();
-    final String query = String.format("select * from dfs.`%s`", emptyDirPath);
-    try {
-      testNoResult(query);
-    } catch (Exception ex) {
-      final String pattern = String.format("%s' not found", emptyDirPath).toLowerCase();
-      final boolean isTableNotFound = ex.getMessage().toLowerCase().contains(pattern);
-      assertTrue(isTableNotFound);
-      throw ex;
-    }
-  }
-
   @Test
   public void testBackPathBad() throws Exception {
     final String[][] badPaths =

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestSchemaNotFoundException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestSchemaNotFoundException.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestSchemaNotFoundException.java
index 194b78b..2fe44c2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestSchemaNotFoundException.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestSchemaNotFoundException.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -70,7 +70,7 @@ public class TestSchemaNotFoundException extends BaseTestQuery {
 
     @Test(expected = Exception.class)
     public void testTableNotFoundException() throws Exception {
-        final String table = String.format("%s/empty1", TestTools.WORKING_PATH.resolve(TestTools.TEST_RESOURCES));
+        final String table = String.format("%s/missing.parquet", TestTools.WORKING_PATH.resolve(TestTools.TEST_RESOURCES));
         final String query = String.format("select * from tmp.`%s`", table);
         try {
             testNoResult("use dfs");

http://git-wip-us.apache.org/repos/asf/drill/blob/f3020081/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java
index 6dd2e66..8d9d4fc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,11 +18,9 @@
 package org.apache.drill.exec.store.parquet;
 
 import org.apache.drill.test.BaseTestQuery;
-import org.apache.drill.common.exceptions.UserRemoteException;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
 
 public class TestParquetGroupScan extends BaseTestQuery {
 
@@ -56,65 +54,49 @@ public class TestParquetGroupScan extends BaseTestQuery {
   public void testFix4376() throws Exception {
     prepareTables("4376_1", true);
 
-    testBuilder()
-      .sqlQuery("SELECT COUNT(*) AS `count` FROM dfs.tmp.`4376_1/60*`")
-      .ordered()
-      .baselineColumns("count").baselineValues(1984L)
-      .go();
+    int actualRecordCount = testSql("SELECT * FROM dfs.tmp.`4376_1/60*`");
+    int expectedRecordCount = 1984;
+    assertEquals(String.format("Received unexpected number of rows in output: expected = %d, received = %s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
   }
 
   @Test
   public void testWildCardEmptyWithCache() throws Exception {
     prepareTables("4376_2", true);
 
-    try {
-      runSQL("SELECT COUNT(*) AS `count` FROM dfs.tmp.`4376_2/604*`");
-      fail("Query should've failed!");
-    } catch (UserRemoteException uex) {
-      final String expectedMsg = "The table you tried to query is empty";
-      assertTrue(String.format("Error message should contain \"%s\" but was instead \"%s\"", expectedMsg,
-        uex.getMessage()), uex.getMessage().contains(expectedMsg));
-    }
+    int actualRecordCount = testSql("SELECT * FROM dfs.tmp.`4376_2/604*`");
+    int expectedRecordCount = 0;
+    assertEquals(String.format("Received unexpected number of rows in output: expected = %d, received = %s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
   }
 
   @Test
   public void testWildCardEmptyNoCache() throws Exception {
     prepareTables("4376_3", false);
 
-    try {
-      runSQL("SELECT COUNT(*) AS `count` FROM dfs.tmp.`4376_3/604*`");
-      fail("Query should've failed!");
-    } catch (UserRemoteException uex) {
-      final String expectedMsg = "Object '4376_3/604*' not found within 'dfs.tmp'";
-      assertTrue(String.format("Error message should contain \"%s\" but was instead \"%s\"", expectedMsg,
-        uex.getMessage()), uex.getMessage().contains(expectedMsg));
-    }
+    int actualRecordCount = testSql("SELECT * FROM dfs.tmp.`4376_3/604*`");
+    int expectedRecordCount = 0;
+    assertEquals(String.format("Received unexpected number of rows in output: expected = %d, received = %s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
   }
 
   @Test
   public void testSelectEmptyWithCache() throws Exception {
     prepareTables("4376_4", true);
 
-    try {
-      runSQL("SELECT COUNT(*) AS `count` FROM dfs.tmp.`4376_4/6041`");
-      fail("Query should've failed!");
-    } catch (UserRemoteException uex) {
-      final String expectedMsg = "The table you tried to query is empty";
-      assertTrue(String.format("Error message should contain \"%s\" but was instead \"%s\"", expectedMsg,
-        uex.getMessage()), uex.getMessage().contains(expectedMsg));
-    }
+    int actualRecordCount = testSql("SELECT * FROM dfs.tmp.`4376_4/6041`");
+    int expectedRecordCount = 0;
+    assertEquals(String.format("Received unexpected number of rows in output: expected = %d, received = %s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
   }
 
   @Test
   public void testSelectEmptyNoCache() throws Exception {
     prepareTables("4376_5", false);
-    try {
-      runSQL("SELECT COUNT(*) AS `count` FROM dfs.tmp.`4376_5/6041`");
-      fail("Query should've failed!");
-    } catch (UserRemoteException uex) {
-      final String expectedMsg = "Object '4376_5/6041' not found within 'dfs.tmp'";
-      assertTrue(String.format("Error message should contain \"%s\" but was instead \"%s\"", expectedMsg,
-        uex.getMessage()), uex.getMessage().contains(expectedMsg));
-    }
+
+    int actualRecordCount = testSql("SELECT * FROM dfs.tmp.`4376_5/6041`");
+    int expectedRecordCount = 0;
+    assertEquals(String.format("Received unexpected number of rows in output: expected = %d, received = %s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
   }
 }