You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by wy...@apache.org on 2021/11/15 16:41:01 UTC

[asterixdb] branch master updated: [ASTERIXDB-2983][EXT] Warn on no-files for Parquet instead of failing

This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 35fd66d  [ASTERIXDB-2983][EXT] Warn on no-files for Parquet instead of failing
35fd66d is described below

commit 35fd66de55fb1727c435336c9dd27146d2ffd0de
Author: Wail Alkowaileet <wa...@gmail.com>
AuthorDate: Sun Nov 14 16:46:37 2021 -0800

    [ASTERIXDB-2983][EXT] Warn on no-files for Parquet instead of failing
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Parquet adapter fails (IOException) if no files were returned
    - We should warn instead of failing to be consistent with other
      file formats (e.g., JSON)
    - This patch also fixes inappropriate calls to
      IWarningCollector#shouldWarn()
    
    Change-Id: Ie929046ef01ea5eee60d23f6e4665ac6727e2d1e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14063
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Wael Alkowaileet <wa...@gmail.com>
    Reviewed-by: Hussain Towaileb <hu...@gmail.com>
---
 .../common/parquet/no-files/no-files.1.ddl.sqlpp   | 42 ++++++++++++++
 .../common/parquet/no-files/no-files.2.query.sqlpp | 30 ++++++++++
 .../common/parquet/no-files/no-files.02.adm        |  1 +
 .../runtimets/testsuite_external_dataset_s3.xml    |  9 +++
 .../external/input/HDFSDataSourceFactory.java      | 22 +++++---
 .../aws/parquet/AwsS3ParquetReaderFactory.java     |  2 +-
 .../parquet/AzureBlobParquetReaderFactory.java     |  2 +-
 .../reader/hdfs/HDFSLookupReaderFactory.java       |  2 +-
 .../parquet/AsterixTypeToParquetTypeVisitor.java   |  6 +-
 .../hdfs/parquet/ParquetFileRecordReader.java      |  6 +-
 .../reader/hdfs/parquet/ParquetReadSupport.java    |  6 +-
 .../external/util/ExternalDataConstants.java       |  1 -
 .../apache/asterix/external/util/HDFSUtils.java    | 23 +++++---
 .../apache/hyracks/hdfs/scheduler/Scheduler.java   | 66 +++++++++-------------
 14 files changed, 149 insertions(+), 69 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.1.ddl.sqlpp
new file mode 100644
index 0000000..2fc3e62
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.1.ddl.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : No files
+* Expected Res : Warning: The provided external dataset configuration returned no files from the external source
+* Date         : November 14th 2021
+*/
+
+-- param max-warnings:json=1000
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+
+CREATE TYPE ParquetType as {
+};
+
+CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING %adapter%
+(
+  %template%,
+  ("container"="playground"),
+  ("definition"="NOT_A_DEFINITION"),
+  ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.2.query.sqlpp
new file mode 100644
index 0000000..37de163
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.2.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : No files
+* Expected Res : Warning: The provided external dataset configuration returned no files from the external source
+* Date         : November 14th 2021
+*/
+
+-- param max-warnings:json=1000
+
+USE test;
+
+SELECT VALUE COUNT(*)
+FROM ParquetDataset p
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/no-files/no-files.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/no-files/no-files.02.adm
new file mode 100644
index 0000000..c227083
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/no-files/no-files.02.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 96d34ec..6369c15 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -163,6 +163,15 @@
         <expected-error>ASX1161: Type 'ParquetType' contains declared fields, which is not supported for 'parquet' format</expected-error>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="common/parquet/no-files">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">common/parquet/no-files</output-dir>
+        <source-location>false</source-location>
+        <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+        <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+      </compilation-unit>
+    </test-case>
     <!-- Parquet Tests End -->
     <test-case FilePath="external-dataset">
       <compilation-unit name="common/empty-string-definition">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index f15f735..8ea9ed4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -54,6 +54,7 @@ import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
 import org.apache.hyracks.hdfs.scheduler.Scheduler;
@@ -86,16 +87,16 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
             IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
-        JobConf hdfsConf = createHdfsConf(serviceCtx, configuration, warningCollector.shouldWarn());
+        JobConf hdfsConf = createHdfsConf(serviceCtx, configuration);
         configureHdfsConf(hdfsConf, configuration);
     }
 
-    protected JobConf createHdfsConf(IServiceContext serviceCtx, Map<String, String> configuration, boolean shouldWarn)
+    protected JobConf createHdfsConf(IServiceContext serviceCtx, Map<String, String> configuration)
             throws HyracksDataException {
         this.serviceCtx = serviceCtx;
         this.configuration = configuration;
         init((ICCServiceContext) serviceCtx);
-        return HDFSUtils.configureHDFSJobConf(configuration, shouldWarn);
+        return HDFSUtils.configureHDFSJobConf(configuration);
     }
 
     protected void configureHdfsConf(JobConf conf, Map<String, String> configuration) throws AlgebricksException {
@@ -106,7 +107,7 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd
             // if files list was set, we restrict the splits to the list
             InputSplit[] inputSplits;
             if (files == null) {
-                inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+                inputSplits = getInputSplits(conf, numPartitions);
             } else {
                 inputSplits = HDFSUtils.getSplits(conf, files);
             }
@@ -119,12 +120,12 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd
             read = new boolean[readSchedule.length];
             Arrays.fill(read, false);
             String formatString = configuration.get(ExternalDataConstants.KEY_FORMAT);
-            if (formatString == null || formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)
-                    || formatString.equals(ExternalDataConstants.FORMAT_NOOP)
-                    || formatString.equals(ExternalDataConstants.FORMAT_PARQUET)) {
+            if (formatString == null || formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)) {
                 RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
                 this.recordClass = reader.createValue().getClass();
                 reader.close();
+            } else if (formatString.equals(ExternalDataConstants.FORMAT_PARQUET)) {
+                recordClass = IValueReference.class;
             } else {
                 recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
                 this.recordClass = char[].class;
@@ -134,6 +135,13 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd
         }
     }
 
+    private InputSplit[] getInputSplits(JobConf conf, int numPartitions) throws IOException {
+        if (HDFSUtils.isEmpty(conf)) {
+            return Scheduler.EMPTY_INPUT_SPLITS;
+        }
+        return conf.getInputFormat().getSplits(conf, numPartitions);
+    }
+
     // Used to tell the factory to restrict the splits to the intersection between this list a
     // actual files on hde
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 85d8671..803e657 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -50,7 +50,7 @@ public class AwsS3ParquetReaderFactory extends HDFSDataSourceFactory {
         putS3ConfToHadoopConf(configuration, path);
 
         //Configure Hadoop S3 input splits
-        JobConf conf = createHdfsConf(serviceCtx, configuration, warningCollector.shouldWarn());
+        JobConf conf = createHdfsConf(serviceCtx, configuration);
         int numberOfPartitions = getPartitionConstraint().getLocations().length;
         ExternalDataUtils.AwsS3.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
         configureHdfsConf(conf, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 0f9f484..ee765ce 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -54,7 +54,7 @@ public class AzureBlobParquetReaderFactory extends HDFSDataSourceFactory {
         putAzureBlobConfToHadoopConf(configuration, path);
 
         //Configure Hadoop Azure input splits
-        JobConf conf = createHdfsConf(serviceCtx, configuration, warningCollector.shouldWarn());
+        JobConf conf = createHdfsConf(serviceCtx, configuration);
         ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
         configureHdfsConf(conf, configuration);
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
index 8fbe5c4..75d431d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
@@ -65,7 +65,7 @@ public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
             IWarningCollector warningCollector) throws AsterixException {
         this.serviceCtx = serviceCtx;
         this.configuration = configuration;
-        JobConf conf = HDFSUtils.configureHDFSJobConf(configuration, warningCollector.shouldWarn());
+        JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
         try {
             confFactory = new ConfFactory(conf);
         } catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
index d596493..c0a47d5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
@@ -49,8 +49,8 @@ public class AsterixTypeToParquetTypeVisitor implements IATypeVisitor<Type, Type
     private final List<Warning> warnings;
     private Map<String, FunctionCallInformation> funcInfo;
 
-    public AsterixTypeToParquetTypeVisitor(boolean shouldWarn) {
-        warnings = shouldWarn ? new ArrayList<>() : null;
+    public AsterixTypeToParquetTypeVisitor() {
+        warnings = new ArrayList<>();
     }
 
     public MessageType clipType(ARecordType rootType, MessageType fileSchema,
@@ -161,7 +161,7 @@ public class AsterixTypeToParquetTypeVisitor implements IATypeVisitor<Type, Type
         ATypeTag expectedType = node.getTypeTag();
 
         boolean isNotExpected = actualType != expectedType;
-        if (warnings != null && isNotExpected) {
+        if (isNotExpected) {
             //typeName is unique
             FunctionCallInformation info = funcInfo.get(node.getTypeName());
             //If no warning is created, then it means it has been reported
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
index b06afd3..cc9b34c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
@@ -51,10 +51,8 @@ public class ParquetFileRecordReader<V extends IValueReference> extends Abstract
     @Override
     public void close() throws IOException {
         super.close();
-        if (warningCollector.shouldWarn()) {
-            //report warnings
-            HDFSUtils.issueWarnings(warningCollector, conf);
-        }
+        //Issue warning if any was reported
+        HDFSUtils.issueWarnings(warningCollector, conf);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
index b2a5eeb..aac293d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
@@ -52,8 +51,7 @@ public class ParquetReadSupport extends ReadSupport<IValueReference> {
     private static MessageType getRequestedSchema(InitContext context) {
         Configuration configuration = context.getConfiguration();
         MessageType fileSchema = context.getFileSchema();
-        boolean shouldWarn = configuration.getBoolean(ExternalDataConstants.KEY_HADOOP_ASTERIX_WARNINGS_ENABLED, false);
-        AsterixTypeToParquetTypeVisitor visitor = new AsterixTypeToParquetTypeVisitor(shouldWarn);
+        AsterixTypeToParquetTypeVisitor visitor = new AsterixTypeToParquetTypeVisitor();
         try {
             ARecordType expectedType = HDFSUtils.getExpectedType(configuration);
             Map<String, FunctionCallInformation> functionCallInformationMap =
@@ -61,7 +59,7 @@ public class ParquetReadSupport extends ReadSupport<IValueReference> {
             MessageType requestedType = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
             List<Warning> warnings = visitor.getWarnings();
 
-            if (shouldWarn && !warnings.isEmpty()) {
+            if (!warnings.isEmpty()) {
                 //New warnings were created, set the warnings in hadoop configuration to be reported
                 HDFSUtils.setWarnings(warnings, configuration);
                 //Update the reported warnings so that we do not report the same warning again
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index ec25997..16a0f66 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -60,7 +60,6 @@ public class ExternalDataConstants {
     public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
     public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
     public static final String KEY_HADOOP_BUFFER_SIZE = "io.file.buffer.size";
-    public static final String KEY_HADOOP_ASTERIX_WARNINGS_ENABLED = "org.apache.asterix.warnings.enabled";
     //Base64 encoded warnings issued from Hadoop
     public static final String KEY_HADOOP_ASTERIX_WARNINGS_LIST = "org.apache.asterix.warnings.list";
     //Disable caching FileSystem for Hadoop
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 9f65cd7..28a0766 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -201,7 +201,7 @@ public class HDFSUtils {
         }
     }
 
-    public static JobConf configureHDFSJobConf(Map<String, String> configuration, boolean shouldWarn) {
+    public static JobConf configureHDFSJobConf(Map<String, String> configuration) {
         JobConf conf = new JobConf();
         String localShortCircuitSocketPath = configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
         String formatClassName = HDFSUtils.getInputFormatClassName(configuration);
@@ -235,11 +235,6 @@ public class HDFSUtils {
                 //Subset of the values were requested, set the functionCallInformation
                 conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
                         configuration.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION));
-                /*
-                 * Allows Parquet to issue warnings in case we found type mismatches (if warnings are enabled).
-                 * Warnings will be issued during the type matching of Parquet's schema with the requested schema
-                 */
-                conf.setBoolean(ExternalDataConstants.KEY_HADOOP_ASTERIX_WARNINGS_ENABLED, shouldWarn);
             }
             conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS, requestedValues);
         }
@@ -320,7 +315,7 @@ public class HDFSUtils {
 
     public static void issueWarnings(IWarningCollector warningCollector, Configuration conf) throws IOException {
         String warnings = conf.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_WARNINGS_LIST, "");
-        if (!warnings.isEmpty() && warningCollector.shouldWarn()) {
+        if (!warnings.isEmpty()) {
             String[] encodedWarnings = warnings.split(",");
             Base64.Decoder decoder = Base64.getDecoder();
             for (int i = 0; i < encodedWarnings.length; i++) {
@@ -330,7 +325,9 @@ public class HDFSUtils {
                  */
                 byte[] warningBytes = decoder.decode(encodedWarnings[i]);
                 DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(warningBytes));
-                warningCollector.warn(Warning.create(dataInputStream));
+                if (warningCollector.shouldWarn()) {
+                    warningCollector.warn(Warning.create(dataInputStream));
+                }
             }
             //Remove reported warnings
             conf.unset(ExternalDataConstants.KEY_HADOOP_ASTERIX_WARNINGS_LIST);
@@ -348,4 +345,14 @@ public class HDFSUtils {
         conf.set(String.format(ExternalDataConstants.KEY_HADOOP_DISABLE_FS_CACHE_TEMPLATE, protocol),
                 ExternalDataConstants.TRUE);
     }
+
+    /**
+     * Check whether the provided path is empty
+     *
+     * @param job Hadoop Configuration
+     * @return <code>true</code> if the path is empty, <code>false</code> otherwise
+     */
+    public static boolean isEmpty(JobConf job) {
+        return job.get(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, "").isEmpty();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
index bc187f8..b9d68f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
@@ -49,16 +49,21 @@ import org.apache.logging.log4j.Logger;
  * class works for Hadoop old API.
  */
 public class Scheduler {
+    /**
+     * Empty input splits
+     */
+    public static final InputSplit[] EMPTY_INPUT_SPLITS = {};
+
     private static final Logger LOGGER = LogManager.getLogger();
 
     /** a list of NCs */
     private String[] NCs;
 
     /** a map from ip to NCs */
-    private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
+    private Map<String, List<String>> ipToNcMapping = new HashMap<>();
 
     /** a map from the NC name to the index */
-    private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+    private Map<String, Integer> ncNameToIndex = new HashMap<>();
 
     /** a map from NC name to the NodeControllerInfo */
     private Map<String, NodeControllerInfo> ncNameToNcInfos;
@@ -108,8 +113,7 @@ public class Scheduler {
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
@@ -121,10 +125,8 @@ public class Scheduler {
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
-     * @param topology
-     *            the hyracks cluster toplogy
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
+     * @param topology        the hyracks cluster toplogy
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
@@ -137,8 +139,7 @@ public class Scheduler {
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder)
@@ -156,14 +157,14 @@ public class Scheduler {
      * @throws HyracksDataException
      */
     public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
-        if (splits == null) {
+        if (splits == null || splits == EMPTY_INPUT_SPLITS) {
             /** deal the case when the splits array is null */
             return new String[] {};
         }
         int[] workloads = new int[NCs.length];
         Arrays.fill(workloads, 0);
         String[] locations = new String[splits.length];
-        Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
+        Map<String, IntWritable> locationToNumOfSplits = new HashMap<>();
         /**
          * upper bound number of slots that a machine can get
          */
@@ -217,16 +218,11 @@ public class Scheduler {
     /**
      * Schedule non-local slots to each machine
      *
-     * @param splits
-     *            The HDFS file splits.
-     * @param workloads
-     *            The current capacity of each machine.
-     * @param locations
-     *            The result schedule.
-     * @param slotLimit
-     *            The maximum slots of each machine.
-     * @param scheduled
-     *            Indicate which slot is scheduled.
+     * @param splits    The HDFS file splits.
+     * @param workloads The current capacity of each machine.
+     * @param locations The result schedule.
+     * @param slotLimit The maximum slots of each machine.
+     * @param scheduled Indicate which slot is scheduled.
      */
     private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slotLimit,
             boolean[] scheduled) throws IOException, UnknownHostException {
@@ -259,18 +255,12 @@ public class Scheduler {
     /**
      * Schedule data-local slots to each machine.
      *
-     * @param splits
-     *            The HDFS file splits.
-     * @param workloads
-     *            The current capacity of each machine.
-     * @param locations
-     *            The result schedule.
-     * @param slots
-     *            The maximum slots of each machine.
-     * @param random
-     *            The random generator.
-     * @param scheduled
-     *            Indicate which slot is scheduled.
+     * @param splits    The HDFS file splits.
+     * @param workloads The current capacity of each machine.
+     * @param locations The result schedule.
+     * @param slots     The maximum slots of each machine.
+     * @param random    The random generator.
+     * @param scheduled Indicate which slot is scheduled.
      * @throws IOException
      * @throws UnknownHostException
      */
@@ -278,7 +268,7 @@ public class Scheduler {
             boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits)
             throws IOException, UnknownHostException {
         /** scheduling candidates will be ordered inversely according to their popularity */
-        PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
+        PriorityQueue<String> scheduleCadndiates = new PriorityQueue<>(3, new Comparator<>() {
 
             @Override
             public int compare(String s1, String s2) {
@@ -346,10 +336,8 @@ public class Scheduler {
     /**
      * Scan the splits once and build a popularity map
      *
-     * @param splits
-     *            the split array
-     * @param locationToNumOfSplits
-     *            the map to be built
+     * @param splits                the split array
+     * @param locationToNumOfSplits the map to be built
      * @throws IOException
      */
     private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)