You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/03/04 04:51:19 UTC

[druid] branch master updated: Skip empty files for local, hdfs, and cloud input sources (#9450)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9466ac7  Skip empty files for local, hdfs, and cloud input sources (#9450)
9466ac7 is described below

commit 9466ac7c9b2b1d86b06cfa7c7f5eeb35d588dd22
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Mar 3 20:51:06 2020 -0800

    Skip empty files for local, hdfs, and cloud input sources (#9450)
    
    * Skip empty files for local, hdfs, and cloud input sources
    
    * split hint spec doc
    
    * doc for skipping empty files
    
    * fix typo; adjust tests
    
    * unnecessary fluent iterable
    
    * address comments
    
    * fix test
    
    * use the right lists
    
    * fix test
    
    * fix test
---
 .../org/apache/druid/data/input/InputEntity.java   |   2 +
 .../druid/data/input/MaxSizeSplitHintSpec.java     |  11 +-
 .../druid/data/input/impl/InlineInputSource.java   |   2 +
 .../druid/data/input/impl/LocalInputSource.java    |  12 +-
 .../druid/data/input/MaxSizeSplitHintSpecTest.java |  25 +-
 .../data/input/impl/LocalInputSourceTest.java      |  44 ++-
 docs/ingestion/native-batch.md                     |  40 +--
 .../storage/azure/AzureCloudBlobIterator.java      |   2 +-
 .../data/input/azure/AzureInputSourceTest.java     |   2 +-
 .../storage/azure/AzureCloudBlobIteratorTest.java  |  61 ++++-
 .../google/GoogleCloudStorageInputSource.java      |   4 +-
 .../apache/druid/storage/google/GoogleUtils.java   | 105 +-------
 .../storage/google/ObjectStorageIterator.java      | 129 +++++++++
 .../storage/google/ObjectStorageIteratorTest.java  | 297 +++++++++++++++++++++
 .../druid/inputsource/hdfs/HdfsInputSource.java    |   1 +
 .../druid/storage/s3/ObjectSummaryIterator.java    |   4 +-
 .../druid/data/input/s3/S3InputSourceTest.java     |  54 +++-
 .../storage/s3/ObjectSummaryIteratorTest.java      |  16 +-
 .../s3/S3TimestampVersionedDataFinderTest.java     |   4 +
 19 files changed, 668 insertions(+), 147 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java
index 470e040..71f14d9 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java
@@ -35,6 +35,8 @@ import java.net.URI;
 
 /**
  * InputEntity abstracts an input entity and knows how to read bytes from the given entity.
+ * Since the implementations of this interface assume that the given entity is not empty, the InputSources
+ * should not create InputEntities for empty entities.
  */
 @UnstableApi
 public interface InputEntity
diff --git a/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java b/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
index 4f810e2..4b834c1 100644
--- a/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.data.input;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -61,6 +62,10 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
   @Override
   public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
   {
+    final Iterator<T> nonEmptyFileOnlyIterator = Iterators.filter(
+        inputIterator,
+        input -> inputAttributeExtractor.apply(input).getSize() > 0
+    );
     return new Iterator<List<T>>()
     {
       private T peeking;
@@ -68,7 +73,7 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
       @Override
       public boolean hasNext()
       {
-        return peeking != null || inputIterator.hasNext();
+        return peeking != null || nonEmptyFileOnlyIterator.hasNext();
       }
 
       @Override
@@ -79,9 +84,9 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
         }
         final List<T> current = new ArrayList<>();
         long splitSize = 0;
-        while (splitSize < maxSplitSize && (peeking != null || inputIterator.hasNext())) {
+        while (splitSize < maxSplitSize && (peeking != null || nonEmptyFileOnlyIterator.hasNext())) {
           if (peeking == null) {
-            peeking = inputIterator.next();
+            peeking = nonEmptyFileOnlyIterator.next();
           }
           final long size = inputAttributeExtractor.apply(peeking).getSize();
           if (current.isEmpty() || splitSize + size < maxSplitSize) {
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java
index a99f7cd..1e7b59f 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.AbstractInputSource;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRowSchema;
@@ -38,6 +39,7 @@ public class InlineInputSource extends AbstractInputSource
   @JsonCreator
   public InlineInputSource(@JsonProperty("data") String data)
   {
+    Preconditions.checkArgument(data != null && !data.isEmpty(), "empty data");
     this.data = data;
   }
 
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
index 65b0f61..04b7dc7 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
@@ -125,10 +125,14 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI
   @VisibleForTesting
   Iterator<File> getFileIterator()
   {
-    return Iterators.concat(
-        getDirectoryListingIterator(),
-        getFilesListIterator()
-    );
+    return
+        Iterators.filter(
+            Iterators.concat(
+                getDirectoryListingIterator(),
+                getFilesListIterator()
+            ),
+            file -> file.length() > 0
+        );
   }
 
   private Iterator<File> getDirectoryListingIterator()
diff --git a/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java b/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java
index 6e7db29..5ec57c0b 100644
--- a/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/MaxSizeSplitHintSpecTest.java
@@ -20,8 +20,8 @@
 package org.apache.druid.data.input;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import nl.jqno.equalsverifier.EqualsVerifier;
-import org.apache.commons.compress.utils.Lists;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -80,6 +80,29 @@ public class MaxSizeSplitHintSpecTest
   }
 
   @Test
+  public void testSplitSkippingEmptyInputs()
+  {
+    final int nonEmptyInputSize = 3;
+    final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(10L);
+    final Function<Integer, InputFileAttribute> inputAttributeExtractor = InputFileAttribute::new;
+    final IntStream dataStream = IntStream.concat(
+        IntStream.concat(
+            IntStream.generate(() -> 0).limit(10),
+            IntStream.generate(() -> nonEmptyInputSize).limit(10)
+        ),
+        IntStream.generate(() -> 0).limit(10)
+    );
+    final List<List<Integer>> splits = Lists.newArrayList(
+        splitHintSpec.split(dataStream.iterator(), inputAttributeExtractor)
+    );
+    Assert.assertEquals(4, splits.size());
+    Assert.assertEquals(3, splits.get(0).size());
+    Assert.assertEquals(3, splits.get(1).size());
+    Assert.assertEquals(3, splits.get(2).size());
+    Assert.assertEquals(1, splits.get(3).size());
+  }
+
+  @Test
   public void testEquals()
   {
     EqualsVerifier.forClass(MaxSizeSplitHintSpec.class).withNonnullFields("maxSplitSize").usingGetClass().verify();
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java
index 6d7342c..9781435 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.impl;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.InputSplit;
@@ -33,6 +34,9 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -66,7 +70,7 @@ public class LocalInputSourceTest
   {
     final long fileSize = 15;
     final long maxSplitSize = 50;
-    final Set<File> files = prepareFiles(10, fileSize);
+    final Set<File> files = mockFiles(10, fileSize);
     final LocalInputSource inputSource = new LocalInputSource(null, null, files);
     final List<InputSplit<List<File>>> splits = inputSource
         .createSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize))
@@ -83,7 +87,7 @@ public class LocalInputSourceTest
   {
     final long fileSize = 13;
     final long maxSplitSize = 40;
-    final Set<File> files = prepareFiles(10, fileSize);
+    final Set<File> files = mockFiles(10, fileSize);
     final LocalInputSource inputSource = new LocalInputSource(null, null, files);
     Assert.assertEquals(
         4,
@@ -97,11 +101,19 @@ public class LocalInputSourceTest
     File baseDir = temporaryFolder.newFolder();
     List<File> filesInBaseDir = new ArrayList<>();
     for (int i = 0; i < 10; i++) {
-      filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir));
+      final File file = File.createTempFile("local-input-source", ".data", baseDir);
+      try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
+        writer.write("test");
+      }
+      filesInBaseDir.add(file);
     }
     Set<File> files = new HashSet<>(filesInBaseDir.subList(0, 5));
     for (int i = 0; i < 3; i++) {
-      files.add(File.createTempFile("local-input-source", ".data", baseDir));
+      final File file = File.createTempFile("local-input-source", ".data", baseDir);
+      try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
+        writer.write("test");
+      }
+      files.add(file);
     }
     Set<File> expectedFiles = new HashSet<>(filesInBaseDir);
     expectedFiles.addAll(files);
@@ -117,7 +129,11 @@ public class LocalInputSourceTest
     File baseDir = temporaryFolder.newFolder();
     Set<File> filesInBaseDir = new HashSet<>();
     for (int i = 0; i < 10; i++) {
-      filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir));
+      final File file = File.createTempFile("local-input-source", ".data", baseDir);
+      try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
+        writer.write("test");
+      }
+      filesInBaseDir.add(file);
     }
     Iterator<File> fileIterator = new LocalInputSource(baseDir, "*", null).getFileIterator();
     Set<File> actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet());
@@ -130,14 +146,28 @@ public class LocalInputSourceTest
     File baseDir = temporaryFolder.newFolder();
     Set<File> filesInBaseDir = new HashSet<>();
     for (int i = 0; i < 10; i++) {
-      filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir));
+      final File file = File.createTempFile("local-input-source", ".data", baseDir);
+      try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
+        writer.write("test");
+      }
+      filesInBaseDir.add(file);
     }
     Iterator<File> fileIterator = new LocalInputSource(null, null, filesInBaseDir).getFileIterator();
     Set<File> actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet());
     Assert.assertEquals(filesInBaseDir, actualFiles);
   }
 
-  private static Set<File> prepareFiles(int numFiles, long fileSize)
+  @Test
+  public void testFileIteratorWithEmptyFilesIteratingNonEmptyFilesOnly()
+  {
+    final Set<File> files = new HashSet<>(mockFiles(10, 5));
+    files.addAll(mockFiles(10, 0));
+    final LocalInputSource inputSource = new LocalInputSource(null, null, files);
+    List<File> iteratedFiles = Lists.newArrayList(inputSource.getFileIterator());
+    Assert.assertTrue(iteratedFiles.stream().allMatch(file -> file.length() > 0));
+  }
+
+  private static Set<File> mockFiles(int numFiles, long fileSize)
   {
     final Set<File> files = new HashSet<>();
     for (int i = 0; i < numFiles; i++) {
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 39e838a..a3e7d1c 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -74,7 +74,7 @@ You may want to consider the below things:
 
 - You may want to control the amount of input data each worker task processes. This can be
   controlled using different configurations depending on the phase in parallel ingestion (see [`partitionsSpec`](#partitionsspec) for more details).
-  For the tasks that read data from the `inputSource`, you can set the [SplitHintSpec](#splithintspec) in the `tuningConfig`.
+  For the tasks that read data from the `inputSource`, you can set the [Split hint spec](#split-hint-spec) in the `tuningConfig`.
   For the tasks that merge shuffled segments, you can set the `totalNumMergeTasks` in the `tuningConfig`.
 - The number of concurrent worker tasks in parallel ingestion is determined by `maxNumConcurrentSubTasks` in the `tuningConfig`.
   The supervisor task checks the number of current running worker tasks and creates more if it's smaller than `maxNumConcurrentSubTasks`
@@ -202,7 +202,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
 |maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
 |numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create when using a `hashed` `partitionsSpec`. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
-|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [SplitHintSpec](#splithintspec) for more details.|null|`maxSize`|
+|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [Split hint spec](#split-hint-spec) for more details.|size-based split hint spec|no|
 |partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` or `single_dim` if `forceGuaranteedRollup` = true|no|
 |indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](index.md#indexspec)|null|no|
 |indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](index.md#indexspec) for possible values.|sa [...]
@@ -219,23 +219,23 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no|
 |chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no|
 
-### `splitHintSpec`
+### Split Hint Spec
 
-`SplitHintSpec` is used to give a hint when the supervisor task creates input splits.
+The split hint spec is used to give a hint when the supervisor task creates input splits.
 Note that each worker task processes a single input split. You can control the amount of data each worker task will read during the first phase.
 
-#### `MaxSizeSplitHintSpec`
+#### Size-based Split Hint Spec
 
-`MaxSizeSplitHintSpec` is respected by all splittable input sources except for the HTTP input source.
+The size-based split hint spec is respected by all splittable input sources except for the HTTP input source.
 
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should always be `maxSize`.|none|yes|
 |maxSplitSize|Maximum number of bytes of input files to process in a single task. If a single file is larger than this number, it will be processed by itself in a single task (Files are never split across tasks yet).|500MB|no|
 
-#### `SegmentsSplitHintSpec`
+#### Segments Split Hint Spec
 
-`SegmentsSplitHintSpec` is used only for [`DruidInputSource`](#druid-input-source) (and legacy [`IngestSegmentFirehose`](#ingestsegmentfirehose)).
+The segments split hint spec is used only for [`DruidInputSource`](#druid-input-source) (and legacy [`IngestSegmentFirehose`](#ingestsegmentfirehose)).
 
 |property|description|default|required?|
 |--------|-----------|-------|---------|
@@ -294,7 +294,7 @@ How the worker task creates segments is:
 The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
 The task runs in 2 phases, i.e., `partial segment generation` and `partial segment merge`.
 - In the `partial segment generation` phase, just like the Map phase in MapReduce,
-the Parallel task splits the input data based on `splitHintSpec`
+the Parallel task splits the input data based on the split hint spec
 and assigns each split to a worker task. Each worker task (type `partial_index_generate`) reads the assigned split,
 and partitions rows by the time chunk from `segmentGranularity` (primary partition key) in the `granularitySpec`
 and then by the hash value of `partitionDimensions` (secondary partition key) in the `partitionsSpec`.
@@ -326,7 +326,7 @@ The first phase is to collect some statistics to find
 the best partitioning and the other 2 phases are to create partial segments
 and to merge them, respectively, as in hash-based partitioning.
 - In the `partial dimension distribution` phase, the Parallel task splits the input data and
-assigns them to worker tasks based on `splitHintSpec`. Each worker task (type `partial_dimension_distribution`) reads
+assigns them to worker tasks based on the split hint spec. Each worker task (type `partial_dimension_distribution`) reads
 the assigned split and builds a histogram for `partitionDimension`.
 The Parallel task collects those histograms from worker tasks and finds
 the best range partitioning based on `partitionDimension` to evenly
@@ -839,10 +839,12 @@ Sample specs:
 |--------|-----------|-------|---------|
 |type|This should be `s3`.|None|yes|
 |uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
-|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
+|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
 |objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
 |properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given)
 
+Note that the S3 input source will skip all empty objects only when `prefixes` is specified.
+
 S3 Object:
 
 |property|description|default|required?|
@@ -927,9 +929,11 @@ Sample specs:
 |--------|-----------|-------|---------|
 |type|This should be `google`.|None|yes|
 |uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
-|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
+|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
 |objects|JSON array of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
 
+Note that the Google Cloud Storage input source will skip all empty objects only when `prefixes` is specified.
+
 Google Cloud Storage object:
 
 |property|description|default|required?|
@@ -1004,9 +1008,11 @@ Sample specs:
 |--------|-----------|-------|---------|
 |type|This should be `google`.|None|yes|
 |uris|JSON array of URIs where Azure Blob objects to be ingested are located. Should be in form "azure://\<container>/\<path-to-file\>"|None|`uris` or `prefixes` or `objects` must be set|
-|prefixes|JSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form "azure://\<container>/\<prefix\>"|None|`uris` or `prefixes` or `objects` must be set|
+|prefixes|JSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form "azure://\<container>/\<prefix\>". Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
 |objects|JSON array of Azure Blob objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
 
+Note that the Azure input source will skip all empty objects only when `prefixes` is specified.
+
 Azure Blob object:
 
 |property|description|default|required?|
@@ -1092,7 +1098,7 @@ Sample specs:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should be `hdfs`.|None|yes|
-|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths.|None|yes|
+|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes|
 
 You can also ingest from cloud storage using the HDFS input source.
 However, if you want to read from AWS S3 or Google Cloud Storage, consider using
@@ -1233,8 +1239,8 @@ Sample spec:
 |--------|-----------|---------|
 |type|This should be "local".|yes|
 |filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes if `baseDir` is specified|
-|baseDir|Directory to search recursively for files to be ingested. |At least one of `baseDir` or `files` should be specified|
-|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. |At least one of `baseDir` or `files` should be specified|
+|baseDir|Directory to search recursively for files to be ingested. Empty files under the `baseDir` will be skipped.|At least one of `baseDir` or `files` should be specified|
+|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. Empty files will be skipped.|At least one of `baseDir` or `files` should be specified|
 
 ### Druid Input Source
 
@@ -1529,7 +1535,7 @@ This firehose will accept any type of parser, but will only utilize the list of
 |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
 |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
 |filter| See [Filters](../querying/filters.md)|no|
-|maxInputSegmentBytesPerTask|Deprecated. Use [SegmentsSplitHintSpec](#segmentssplithintspec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
+|maxInputSegmentBytesPerTask|Deprecated. Use [Segments Split Hint Spec](#segments-split-hint-spec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
 
 <a name="sql-firehose"></a>
 
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
index 72d6509..c2a696c 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java
@@ -150,7 +150,7 @@ public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
       while (blobItemIterator.hasNext()) {
         ListBlobItemHolder blobItem = blobItemDruidFactory.create(blobItemIterator.next());
         /* skip directory objects */
-        if (blobItem.isCloudBlob()) {
+        if (blobItem.isCloudBlob() && blobItem.getCloudBlob().getBlobLength() > 0) {
           currentBlobItem = blobItem.getCloudBlob();
           return;
         }
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
index 35d1e5f..8a7f16c 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
@@ -148,7 +148,7 @@ public class AzureInputSourceTest extends EasyMockSupport
     EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
     EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1))
             .andReturn(CLOUD_OBJECT_LOCATION_1);
-    EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L);
+    EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
     replayAll();
 
     azureInputSource = new AzureInputSource(
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
index e0fd4b4..22bbdba 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureCloudBlobIteratorTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.storage.azure;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.microsoft.azure.storage.ResultContinuation;
 import com.microsoft.azure.storage.ResultSegment;
 import com.microsoft.azure.storage.StorageException;
@@ -127,10 +128,12 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
     blobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItem.class);
     cloudBlobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItemHolder.class);
     cloudBlobDruidPrefixWithOnlyCloudBlobs1 = createMock(CloudBlobHolder.class);
+    EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs1.getBlobLength()).andReturn(10L).anyTimes();
 
     blobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItem.class);
     cloudBlobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItemHolder.class);
     cloudBlobDruidPrefixWithOnlyCloudBlobs2 = createMock(CloudBlobHolder.class);
+    EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs2.getBlobLength()).andReturn(10L).anyTimes();
 
     blobItemPrefixWithCloudBlobsAndDirectories1 = createMock(ListBlobItem.class);
     directoryItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class);
@@ -138,6 +141,7 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
     blobItemPrefixWithCloudBlobsAndDirectories2 = createMock(ListBlobItem.class);
     cloudBlobItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class);
     cloudBlobDruidPrefixWithCloudBlobsAndDirectories = createMock(CloudBlobHolder.class);
+    EasyMock.expect(cloudBlobDruidPrefixWithCloudBlobsAndDirectories.getBlobLength()).andReturn(10L).anyTimes();
 
     blobItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItem.class);
     directoryItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItemHolder.class);
@@ -163,13 +167,13 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
     EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
     EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true);
     EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn(
-        cloudBlobDruidPrefixWithOnlyCloudBlobs1);
+        cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes();
     EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn(
         cloudBlobItemPrefixWithOnlyCloudBlobs1);
 
     EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.isCloudBlob()).andReturn(true);
     EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.getCloudBlob()).andReturn(
-        cloudBlobDruidPrefixWithOnlyCloudBlobs2);
+        cloudBlobDruidPrefixWithOnlyCloudBlobs2).anyTimes();
     EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs2)).andReturn(
         cloudBlobItemPrefixWithOnlyCloudBlobs2);
 
@@ -179,7 +183,7 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
 
     EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.isCloudBlob()).andReturn(true);
     EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.getCloudBlob()).andReturn(
-        cloudBlobDruidPrefixWithCloudBlobsAndDirectories);
+        cloudBlobDruidPrefixWithCloudBlobsAndDirectories).anyTimes();
     EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories2)).andReturn(
         cloudBlobItemPrefixWithCloudBlobsAndDirectories);
 
@@ -273,6 +277,57 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void test_next_emptyObjects_skipEmptyObjects() throws URISyntaxException, StorageException
+  {
+    EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
+    EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true);
+    EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn(
+        cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes();
+    EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn(
+        cloudBlobItemPrefixWithOnlyCloudBlobs1);
+
+    ListBlobItem emptyBlobItem = createMock(ListBlobItem.class);
+    ListBlobItemHolder emptyBlobItemHolder = createMock(ListBlobItemHolder.class);
+    CloudBlobHolder emptyBlobHolder = createMock(CloudBlobHolder.class);
+    EasyMock.expect(emptyBlobHolder.getBlobLength()).andReturn(0L).anyTimes();
+    EasyMock.expect(emptyBlobItemHolder.isCloudBlob()).andReturn(true);
+    EasyMock.expect(emptyBlobItemHolder.getCloudBlob()).andReturn(emptyBlobHolder).anyTimes();
+
+    EasyMock.expect(blobItemDruidFactory.create(emptyBlobItem)).andReturn(emptyBlobItemHolder);
+
+    EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
+        CONTAINER1,
+        PREFIX_ONLY_CLOUD_BLOBS,
+        nullResultContinuationToken,
+        MAX_LISTING_LENGTH
+    )).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1);
+
+    EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getContinuationToken())
+            .andReturn(nullResultContinuationToken);
+    ArrayList<ListBlobItem> resultBlobItemsPrefixWithOnlyCloudBlobs1 = new ArrayList<>();
+    resultBlobItemsPrefixWithOnlyCloudBlobs1.add(blobItemPrefixWithOnlyCloudBlobs1);
+    resultBlobItemsPrefixWithOnlyCloudBlobs1.add(emptyBlobItem);
+    EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getResults())
+            .andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs1);
+
+    replayAll();
+
+    azureCloudBlobIterator = new AzureCloudBlobIterator(
+        storage,
+        blobItemDruidFactory,
+        config,
+        ImmutableList.of(PREFIX_ONLY_CLOUD_BLOBS_URI),
+        MAX_LISTING_LENGTH
+    );
+
+    List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(cloudBlobDruidPrefixWithOnlyCloudBlobs1);
+    List<CloudBlobHolder> actualBlobItems = Lists.newArrayList(azureCloudBlobIterator);
+    Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
+    Assert.assertTrue(expectedBlobItems.containsAll(actualBlobItems));
+    verifyAll();
+  }
+
   @Test(expected = NoSuchElementException.class)
   public void test_next_emptyPrefixes_throwsNoSuchElementException()
   {
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
index b1ae3a4..9d37b3f 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
@@ -47,7 +47,7 @@ import java.util.stream.Stream;
 
 public class GoogleCloudStorageInputSource extends CloudObjectInputSource
 {
-  static final String SCHEME = "gs";
+  public static final String SCHEME = "gs";
 
   private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class);
 
@@ -117,7 +117,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
 
   private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
   {
-    return new CloudObjectLocation(storageObject.getBucket(), storageObject.getName());
+    return GoogleUtils.objectToCloudObjectLocation(storageObject);
   }
 
   private Iterable<StorageObject> storageObjectIterable()
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java
index 9fbb23f..2c18178 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java
@@ -20,18 +20,15 @@
 package org.apache.druid.storage.google;
 
 import com.google.api.client.http.HttpResponseException;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.common.base.Predicate;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
 import org.apache.druid.java.util.common.RetryUtils;
-import org.apache.druid.java.util.common.StringUtils;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.Iterator;
-import java.util.NoSuchElementException;
 
 public class GoogleUtils
 {
@@ -46,11 +43,20 @@ public class GoogleUtils
     return t instanceof IOException;
   }
 
-  private static <T> T retryGoogleCloudStorageOperation(RetryUtils.Task<T> f) throws Exception
+  static <T> T retryGoogleCloudStorageOperation(RetryUtils.Task<T> f) throws Exception
   {
     return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES);
   }
 
+  public static URI objectToUri(StorageObject object)
+  {
+    return objectToCloudObjectLocation(object).toUri(GoogleCloudStorageInputSource.SCHEME);
+  }
+
+  public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object)
+  {
+    return new CloudObjectLocation(object.getBucket(), object.getName());
+  }
 
   public static Iterator<StorageObject> lazyFetchingStorageObjectsIterator(
       final GoogleStorage storage,
@@ -58,91 +64,6 @@ public class GoogleUtils
       final long maxListingLength
   )
   {
-    return new Iterator<StorageObject>()
-    {
-      private Storage.Objects.List listRequest;
-      private Objects results;
-      private URI currentUri;
-      private String currentBucket;
-      private String currentPrefix;
-      private String nextPageToken;
-      private Iterator<StorageObject> storageObjectsIterator;
-
-      {
-        nextPageToken = null;
-        prepareNextRequest();
-        fetchNextBatch();
-      }
-
-      private void prepareNextRequest()
-      {
-        try {
-          currentUri = uris.next();
-          currentBucket = currentUri.getAuthority();
-          currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath());
-          nextPageToken = null;
-          listRequest = storage.list(currentBucket)
-                               .setPrefix(currentPrefix)
-                               .setMaxResults(maxListingLength);
-
-        }
-        catch (IOException io) {
-          throw new RuntimeException(io);
-        }
-      }
-
-      private void fetchNextBatch()
-      {
-        try {
-          listRequest.setPageToken(nextPageToken);
-          results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute());
-          storageObjectsIterator = results.getItems().iterator();
-          nextPageToken = results.getNextPageToken();
-        }
-        catch (Exception ex) {
-          throw new RuntimeException(ex);
-        }
-      }
-
-      @Override
-      public boolean hasNext()
-      {
-        return storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext();
-      }
-
-      @Override
-      public StorageObject next()
-      {
-        if (!hasNext()) {
-          throw new NoSuchElementException();
-        }
-
-        while (storageObjectsIterator.hasNext()) {
-          final StorageObject next = storageObjectsIterator.next();
-          // list with prefix can return directories, but they should always end with `/`, ignore them
-          if (!next.getName().endsWith("/")) {
-            return next;
-          }
-        }
-
-        if (nextPageToken != null) {
-          fetchNextBatch();
-        } else if (uris.hasNext()) {
-          prepareNextRequest();
-          fetchNextBatch();
-        }
-
-        if (!storageObjectsIterator.hasNext()) {
-          throw new ISE(
-              "Failed to further iterate on bucket[%s] and prefix[%s]. The last page token was [%s]",
-              currentBucket,
-              currentPrefix,
-              nextPageToken
-          );
-        }
-
-        return next();
-      }
-    };
+    return new ObjectStorageIterator(storage, uris, maxListingLength);
   }
 }
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java
new file mode 100644
index 0000000..1027511
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google;
+
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class ObjectStorageIterator implements Iterator<StorageObject>
+{
+  private final GoogleStorage storage;
+  private final Iterator<URI> uris;
+  private final long maxListingLength;
+
+  private Storage.Objects.List listRequest;
+  private Objects results;
+  private URI currentUri;
+  private String nextPageToken;
+  private Iterator<StorageObject> storageObjectsIterator;
+  private StorageObject currentObject;
+
+  public ObjectStorageIterator(GoogleStorage storage, Iterator<URI> uris, long maxListingLength)
+  {
+    this.storage = storage;
+    this.uris = uris;
+    this.maxListingLength = maxListingLength;
+    this.nextPageToken = null;
+
+    prepareNextRequest();
+    fetchNextBatch();
+    advanceStorageObject();
+  }
+
+  private void prepareNextRequest()
+  {
+    try {
+      currentUri = uris.next();
+      String currentBucket = currentUri.getAuthority();
+      String currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath());
+      nextPageToken = null;
+      listRequest = storage.list(currentBucket)
+                           .setPrefix(currentPrefix)
+                           .setMaxResults(maxListingLength);
+
+    }
+    catch (IOException io) {
+      throw new RuntimeException(io);
+    }
+  }
+
+  private void fetchNextBatch()
+  {
+    try {
+      listRequest.setPageToken(nextPageToken);
+      results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute());
+      storageObjectsIterator = results.getItems().iterator();
+      nextPageToken = results.getNextPageToken();
+    }
+    catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return currentObject != null;
+  }
+
+  @Override
+  public StorageObject next()
+  {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+
+    final StorageObject retVal = currentObject;
+    advanceStorageObject();
+    return retVal;
+  }
+
+  private void advanceStorageObject()
+  {
+    while (storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext()) {
+      while (storageObjectsIterator.hasNext()) {
+        final StorageObject next = storageObjectsIterator.next();
+        // list with prefix can return directories, but they should always end with `/`, ignore them.
+        // also skips empty objects.
+        if (!next.getName().endsWith("/") && next.getSize().signum() > 0) {
+          currentObject = next;
+          return;
+        }
+      }
+
+      if (nextPageToken != null) {
+        fetchNextBatch();
+      } else if (uris.hasNext()) {
+        prepareNextRequest();
+        fetchNextBatch();
+      }
+    }
+
+    // Truly nothing left to read.
+    currentObject = null;
+  }
+}
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java
new file mode 100644
index 0000000..4d1504f
--- /dev/null
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google;
+
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.storage.google.ObjectStorageIteratorTest.MockStorage.MockObjects.MockList;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.math.BigInteger;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ObjectStorageIteratorTest
+{
+  private static final ImmutableList<StorageObject> TEST_OBJECTS =
+      ImmutableList.of(
+          makeStorageObject("b", "foo", 10L),
+          makeStorageObject("b", "foo/", 0L), // directory
+          makeStorageObject("b", "foo/bar1", 10L),
+          makeStorageObject("b", "foo/bar2", 10L),
+          makeStorageObject("b", "foo/bar3", 10L),
+          makeStorageObject("b", "foo/bar4", 10L),
+          makeStorageObject("b", "foo/bar5", 0L), // empty object
+          makeStorageObject("b", "foo/baz", 10L),
+          makeStorageObject("bucketnotmine", "a/different/bucket", 10L),
+          makeStorageObject("b", "foo/bar/", 0L) // another directory at the end of list
+      );
+
+  @Test
+  public void testSingleObject()
+  {
+    test(
+        ImmutableList.of("gs://b/foo/baz"),
+        ImmutableList.of("gs://b/foo/baz"),
+        5
+    );
+  }
+
+  @Test
+  public void testMultiObjectOneKeyAtATime()
+  {
+    test(
+        ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"),
+        ImmutableList.of("gs://b/foo/"),
+        1
+    );
+  }
+
+  @Test
+  public void testMultiObjectTwoKeysAtATime()
+  {
+    test(
+        ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"),
+        ImmutableList.of("gs://b/foo/"),
+        2
+    );
+  }
+
+  @Test
+  public void testMultiObjectTenKeysAtATime()
+  {
+    test(
+        ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"),
+        ImmutableList.of("gs://b/foo/"),
+        10
+    );
+  }
+
+  @Test
+  public void testPrefixInMiddleOfKey()
+  {
+    test(
+        ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4"),
+        ImmutableList.of("gs://b/foo/bar"),
+        10
+    );
+  }
+
+  @Test
+  public void testNoPath()
+  {
+    test(
+        ImmutableList.of(
+            "gs://b/foo",
+            "gs://b/foo/bar1",
+            "gs://b/foo/bar2",
+            "gs://b/foo/bar3",
+            "gs://b/foo/bar4",
+            "gs://b/foo/baz"
+        ),
+        ImmutableList.of("gs://b"),
+        10
+    );
+  }
+
+  @Test
+  public void testSlashPath()
+  {
+    test(
+        ImmutableList.of(
+            "gs://b/foo",
+            "gs://b/foo/bar1",
+            "gs://b/foo/bar2",
+            "gs://b/foo/bar3",
+            "gs://b/foo/bar4",
+            "gs://b/foo/baz"
+        ),
+        ImmutableList.of("gs://b/"),
+        10
+    );
+  }
+
+  @Test
+  public void testDifferentBucket()
+  {
+    test(
+        ImmutableList.of(),
+        ImmutableList.of("gs://bx/foo/"),
+        10
+    );
+  }
+
+  @Test
+  public void testWithMultiplePrefixesReturningAllNonEmptyObjectsStartingWithOneOfPrefixes()
+  {
+    test(
+        ImmutableList.of("gs://b/foo/bar1", "gs://b/foo/bar2", "gs://b/foo/bar3", "gs://b/foo/bar4", "gs://b/foo/baz"),
+        ImmutableList.of("gs://b/foo/bar", "gs://b/foo/baz"),
+        10
+    );
+  }
+
+  private static void test(
+      final List<String> expectedUris,
+      final List<String> prefixes,
+      final int maxListingLength
+  )
+  {
+    final List<StorageObject> expectedObjects = new ArrayList<>();
+
+    // O(N^2) but who cares -- the list is short.
+    for (final String uri : expectedUris) {
+      final List<StorageObject> matches = TEST_OBJECTS
+          .stream()
+          .filter(storageObject -> GoogleUtils.objectToUri(storageObject).toString().equals(uri))
+          .collect(Collectors.toList());
+
+      expectedObjects.add(Iterables.getOnlyElement(matches));
+    }
+
+    final List<StorageObject> actualObjects = ImmutableList.copyOf(
+        GoogleUtils.lazyFetchingStorageObjectsIterator(
+            makeMockClient(TEST_OBJECTS),
+            prefixes.stream().map(URI::create).iterator(),
+            maxListingLength
+        )
+    );
+
+    Assert.assertEquals(
+        prefixes.toString(),
+        expectedObjects.stream().map(GoogleUtils::objectToUri).collect(Collectors.toList()),
+        actualObjects.stream().map(GoogleUtils::objectToUri).collect(Collectors.toList())
+    );
+  }
+
+  /**
+   * Makes a mock Google Storage client that handles enough of "List" to test the functionality of the
+   * {@link ObjectStorageIterator} class.
+   */
+  private static GoogleStorage makeMockClient(final List<StorageObject> storageObjects)
+  {
+    return new GoogleStorage(null)
+    {
+      @Override
+      public Storage.Objects.List list(final String bucket)
+      {
+        return mockList(bucket, storageObjects);
+      }
+    };
+  }
+
+  @SuppressWarnings("UnnecessaryFullyQualifiedName")
+  static class MockStorage extends Storage
+  {
+    private MockStorage()
+    {
+      super(
+          EasyMock.niceMock(HttpTransport.class),
+          EasyMock.niceMock(JsonFactory.class),
+          EasyMock.niceMock(HttpRequestInitializer.class)
+      );
+    }
+
+    private MockList mockList(String bucket, java.util.List<StorageObject> storageObjects)
+    {
+      return new MockObjects().mockList(bucket, storageObjects);
+    }
+
+    class MockObjects extends Storage.Objects
+    {
+      private MockList mockList(String bucket, java.util.List<StorageObject> storageObjects)
+      {
+        return new MockList(bucket, storageObjects);
+      }
+
+      class MockList extends Objects.List
+      {
+        private final java.util.List<StorageObject> storageObjects;
+
+        private MockList(String bucket, java.util.List<StorageObject> storageObjects)
+        {
+          super(bucket);
+          this.storageObjects = storageObjects;
+        }
+
+        @Override
+        public com.google.api.services.storage.model.Objects execute()
+        {
+          // Continuation token is an index in the "objects" list.
+          final String continuationToken = getPageToken();
+          final int startIndex = continuationToken == null ? 0 : Integer.parseInt(continuationToken);
+
+          // Find matching objects.
+          java.util.List<StorageObject> objects = new ArrayList<>();
+          int nextIndex = -1;
+
+          for (int i = startIndex; i < storageObjects.size(); i++) {
+            final StorageObject storageObject = storageObjects.get(i);
+
+            if (storageObject.getBucket().equals(getBucket())
+                && storageObject.getName().startsWith(getPrefix())) {
+
+              if (objects.size() == getMaxResults()) {
+                // We reached our max key limit; set nextIndex (which will lead to a result with truncated = true).
+                nextIndex = i;
+                break;
+              }
+
+              // Generate a summary.
+              objects.add(storageObject);
+            }
+          }
+
+          com.google.api.services.storage.model.Objects retVal = new com.google.api.services.storage.model.Objects();
+          retVal.setItems(objects);
+          if (nextIndex >= 0) {
+            retVal.setNextPageToken(String.valueOf(nextIndex));
+          } else {
+            retVal.setNextPageToken(null);
+          }
+          return retVal;
+        }
+      }
+    }
+  }
+
+  private static MockList mockList(String bucket, List<StorageObject> storageObjects)
+  {
+    return new MockStorage().mockList(bucket, storageObjects);
+  }
+
+  private static StorageObject makeStorageObject(final String bucket, final String key, final long size)
+  {
+    final StorageObject summary = new StorageObject();
+    summary.setBucket(bucket);
+    summary.setName(key);
+    summary.setSize(BigInteger.valueOf(size));
+    return summary;
+  }
+}
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
index 661a078..be7f3c8 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
@@ -116,6 +116,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
 
     return new HdfsFileInputFormat().getSplits(job)
                                     .stream()
+                                    .filter(split -> ((FileSplit) split).getLength() > 0)
                                     .map(split -> ((FileSplit) split).getPath())
                                     .collect(Collectors.toSet());
   }
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java
index 0e791cd..a515c15 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java
@@ -125,8 +125,8 @@ public class ObjectSummaryIterator implements Iterator<S3ObjectSummary>
     while (objectSummaryIterator.hasNext() || result.isTruncated() || prefixesIterator.hasNext()) {
       while (objectSummaryIterator.hasNext()) {
         currentObjectSummary = objectSummaryIterator.next();
-
-        if (!isDirectoryPlaceholder(currentObjectSummary)) {
+        // skips directories and empty objects
+        if (!isDirectoryPlaceholder(currentObjectSummary) && currentObjectSummary.getSize() > 0) {
           return;
         }
       }
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index 59b6303..122f4fd 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -374,8 +374,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
   public void testWithPrefixesSplit()
   {
     EasyMock.reset(S3_CLIENT);
-    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
-    expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
+    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
+    expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT);
     EasyMock.replay(S3_CLIENT);
 
     S3InputSource inputSource = new S3InputSource(
@@ -401,8 +401,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
   public void testCreateSplitsWithSplitHintSpecRespectingHint()
   {
     EasyMock.reset(S3_CLIENT);
-    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
-    expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
+    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
+    expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT);
     EasyMock.replay(S3_CLIENT);
 
     S3InputSource inputSource = new S3InputSource(
@@ -412,7 +412,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
         null,
         PREFIXES,
         null,
-        null);
+        null
+    );
 
     Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
         new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@@ -427,10 +428,39 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects()
+  {
+    EasyMock.reset(S3_CLIENT);
+    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
+    expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), new byte[0]);
+    EasyMock.replay(S3_CLIENT);
+
+    S3InputSource inputSource = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        null,
+        PREFIXES,
+        null,
+        null
+    );
+
+    Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
+        new JsonInputFormat(JSONPathSpec.DEFAULT, null),
+        null
+    );
+    Assert.assertEquals(
+        ImmutableList.of(ImmutableList.of(new CloudObjectLocation(EXPECTED_URIS.get(0)))),
+        splits.map(InputSplit::get).collect(Collectors.toList())
+    );
+    EasyMock.verify(S3_CLIENT);
+  }
+
+  @Test
   public void testAccessDeniedWhileListingPrefix()
   {
     EasyMock.reset(S3_CLIENT);
-    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
+    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
     expectListObjectsAndThrowAccessDenied(EXPECTED_URIS.get(1));
     EasyMock.replay(S3_CLIENT);
 
@@ -459,8 +489,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
   public void testReader() throws IOException
   {
     EasyMock.reset(S3_CLIENT);
-    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
-    expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
+    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
+    expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT);
     expectGetObject(EXPECTED_URIS.get(0));
     expectGetObject(EXPECTED_URIS.get(1));
     EasyMock.replay(S3_CLIENT);
@@ -503,8 +533,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
   public void testCompressedReader() throws IOException
   {
     EasyMock.reset(S3_CLIENT);
-    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)));
-    expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)));
+    expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)), CONTENT);
+    expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)), CONTENT);
     expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(0));
     expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(1));
     EasyMock.replay(S3_CLIENT);
@@ -543,7 +573,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     EasyMock.verify(S3_CLIENT);
   }
 
-  private static void expectListObjects(URI prefix, List<URI> uris)
+  private static void expectListObjects(URI prefix, List<URI> uris, byte[] content)
   {
     final ListObjectsV2Result result = new ListObjectsV2Result();
     result.setBucketName(prefix.getAuthority());
@@ -554,7 +584,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
       final S3ObjectSummary objectSummary = new S3ObjectSummary();
       objectSummary.setBucketName(bucket);
       objectSummary.setKey(key);
-      objectSummary.setSize(CONTENT.length);
+      objectSummary.setSize(content.length);
       result.getObjectSummaries().add(objectSummary);
     }
 
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java
index d63409c..ea2ca4a 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java
@@ -37,13 +37,15 @@ public class ObjectSummaryIteratorTest
   private static final ImmutableList<S3ObjectSummary> TEST_OBJECTS =
       ImmutableList.of(
           makeObjectSummary("b", "foo", 10L),
-          makeObjectSummary("b", "foo/", 0L),
+          makeObjectSummary("b", "foo/", 0L), // directory
           makeObjectSummary("b", "foo/bar1", 10L),
           makeObjectSummary("b", "foo/bar2", 10L),
           makeObjectSummary("b", "foo/bar3", 10L),
           makeObjectSummary("b", "foo/bar4", 10L),
+          makeObjectSummary("b", "foo/bar5", 0L), // empty object
           makeObjectSummary("b", "foo/baz", 10L),
-          makeObjectSummary("bucketnotmine", "a/different/bucket", 10L)
+          makeObjectSummary("bucketnotmine", "a/different/bucket", 10L),
+          makeObjectSummary("b", "foo/bar/", 0L) // another directory at the end of list
       );
 
   @Test
@@ -140,6 +142,16 @@ public class ObjectSummaryIteratorTest
     );
   }
 
+  @Test
+  public void testWithMultiplePrefixesReturningAllNonEmptyObjectsStartingWithOneOfPrefixes()
+  {
+    test(
+        ImmutableList.of("s3://b/foo/bar1", "s3://b/foo/bar2", "s3://b/foo/bar3", "s3://b/foo/bar4", "s3://b/foo/baz"),
+        ImmutableList.of("s3://b/foo/bar", "s3://b/foo/baz"),
+        10
+    );
+  }
+
   private static void test(
       final List<String> expectedUris,
       final List<String> prefixes,
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java
index 8e144c2..f839d3d 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinderTest.java
@@ -46,10 +46,12 @@ public class S3TimestampVersionedDataFinderTest
     object0.setBucketName(bucket);
     object0.setKey(keyPrefix + "/renames-0.gz");
     object0.setLastModified(new Date(0));
+    object0.setSize(10);
 
     object1.setBucketName(bucket);
     object1.setKey(keyPrefix + "/renames-1.gz");
     object1.setLastModified(new Date(1));
+    object1.setSize(10);
 
     final ListObjectsV2Result result = new ListObjectsV2Result();
     result.getObjectSummaries().add(object0);
@@ -116,6 +118,7 @@ public class S3TimestampVersionedDataFinderTest
     object0.setBucketName(bucket);
     object0.setKey(keyPrefix + "/renames-0.gz");
     object0.setLastModified(new Date(0));
+    object0.setSize(10);
 
     final ListObjectsV2Result result = new ListObjectsV2Result();
     result.getObjectSummaries().add(object0);
@@ -153,6 +156,7 @@ public class S3TimestampVersionedDataFinderTest
     object0.setBucketName(bucket);
     object0.setKey(keyPrefix + "/renames-0.gz");
     object0.setLastModified(new Date(0));
+    object0.setSize(10);
 
     final ListObjectsV2Result result = new ListObjectsV2Result();
     result.getObjectSummaries().add(object0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org