You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/02/13 21:03:03 UTC

[GitHub] [druid] jihoonson opened a new pull request #9360: Create splits of multiple files for parallel indexing

jihoonson opened a new pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360
 
 
   ### Description
   
   For now, the Parallel task creates a sub task per input file. This could be not very efficient when you have lots of small files because each task has an overhead for scheduling, JVM startup, etc.
   
   This PR adds a new `MaxSizeSplitHintSpec` and allows the Parallel task to create splits of multiple files. If a split has only one files, that file could be larger than the configured `maxSize`. Otherwise, the total size of files in the same split cannot be larger than `maxSize`. This means, if you have a very large file, there will be only one task that processes the big file. This could be addressed in the future by creating multiple splits for the same file, each of which references to disjoint parts of the file.
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [x] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479590
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
 ##########
 @@ -21,40 +21,64 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOCase;
+import org.apache.commons.io.filefilter.AndFileFilter;
+import org.apache.commons.io.filefilter.IOFileFilter;
+import org.apache.commons.io.filefilter.NameFileFilter;
+import org.apache.commons.io.filefilter.NotFileFilter;
 import org.apache.commons.io.filefilter.TrueFileFilter;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.InputFileAttribute;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
+import org.apache.druid.utils.Streams;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
-import java.util.Spliterator;
-import java.util.Spliterators;
+import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
 
-public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<File>
+public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<List<File>>
 {
   private final File baseDir;
   private final String filter;
+  private final Set<File> files;
 
   @JsonCreator
   public LocalInputSource(
       @JsonProperty("baseDir") File baseDir,
-      @JsonProperty("filter") String filter
+      @JsonProperty("filter") String filter,
+      @JsonProperty("files") Set<File> files
 
 Review comment:
   Oops, added.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479719
 
 

 ##########
 File path: extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
 ##########
 @@ -59,23 +65,42 @@ public GoogleCloudStorageInputSource(
   }
 
   @Override
-  protected GoogleCloudStorageEntity createEntity(InputSplit<CloudObjectLocation> split)
+  protected InputEntity createEntity(CloudObjectLocation location)
   {
-    return new GoogleCloudStorageEntity(storage, split.get());
+    return new GoogleCloudStorageEntity(storage, location);
   }
 
   @Override
-  protected Stream<InputSplit<CloudObjectLocation>> getPrefixesSplitStream()
+  protected Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
   {
-    return StreamSupport.stream(storageObjectIterable().spliterator(), false)
-                        .map(this::byteSourceFromStorageObject)
-                        .map(InputSplit::new);
+    final Iterator<List<StorageObject>> splitIterator = splitHintSpec.split(
+        storageObjectIterable().iterator(),
+        storageObject -> {
+          final BigInteger sizeInBigInteger = storageObject.getSize();
+          long sizeInLong;
+          if (sizeInBigInteger == null) {
+            sizeInLong = Long.MAX_VALUE;
+          } else {
+            try {
+              sizeInLong = sizeInBigInteger.longValueExact();
+            }
+            catch (ArithmeticException e) {
+              sizeInLong = Long.MAX_VALUE;
 
 Review comment:
   The length of a google storage object is the unsigned long type (https://cloud.google.com/storage/docs/json_api/v1/objects#resource-representations). I think it's better to work instead of failing. Added a warning log about the exception.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382233152
 
 

 ##########
 File path: docs/ingestion/native-batch.md
 ##########
 @@ -226,7 +224,14 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 `SplitHintSpec` is used to give a hint when the supervisor task creates input splits.
 Note that each worker task processes a single input split. You can control the amount of data each worker task will read during the first phase.
 
-Currently only one splitHintSpec, i.e., `segments`, is available.
+#### `MaxSizeSplitHintSpec`
+
+`MaxSizeSplitHintSpec` is respected by all splittable input sources except for the HTTP input source.
+
+|property|description|default|required?|
+|--------|-----------|-------|---------|
+|type|This should always be `maxSize`.|none|yes|
+|maxSplitSize|Maximum number of bytes of input files to process in a single task. If a single file is larger than this number, it will be processed by itself in a single task (splitting a large file is not supported yet).|500MB|no|
 
 Review comment:
   đź‘Ť I added "yet" at the end of the sentence since we may want to split files across tasks in the future.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382870695
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
 ##########
 @@ -21,40 +21,64 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOCase;
+import org.apache.commons.io.filefilter.AndFileFilter;
+import org.apache.commons.io.filefilter.IOFileFilter;
+import org.apache.commons.io.filefilter.NameFileFilter;
+import org.apache.commons.io.filefilter.NotFileFilter;
 import org.apache.commons.io.filefilter.TrueFileFilter;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.InputFileAttribute;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
+import org.apache.druid.utils.Streams;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
-import java.util.Spliterator;
-import java.util.Spliterators;
+import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
 
-public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<File>
+public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<List<File>>
 {
   private final File baseDir;
   private final String filter;
+  private final Set<File> files;
 
   @JsonCreator
   public LocalInputSource(
       @JsonProperty("baseDir") File baseDir,
-      @JsonProperty("filter") String filter
+      @JsonProperty("filter") String filter,
+      @JsonProperty("files") Set<File> files
 
 Review comment:
   Can add a `@Nullable` here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382233017
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
 ##########
 @@ -34,28 +39,46 @@
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
 import org.apache.druid.utils.Streams;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<List<File>>
 {
   private final File baseDir;
   private final String filter;
+  private final Set<File> files;
 
   @JsonCreator
   public LocalInputSource(
       @JsonProperty("baseDir") File baseDir,
-      @JsonProperty("filter") String filter
+      @JsonProperty("filter") String filter,
+      @JsonProperty("files") Set<File> files
   )
   {
-    this.baseDir = Preconditions.checkNotNull(baseDir, "baseDir");
-    this.filter = Preconditions.checkNotNull(filter, "filter");
+    this.baseDir = baseDir;
+    this.filter = baseDir != null ? Preconditions.checkNotNull(filter, "filter") : filter;
+    this.files = files;
+
+    if (baseDir == null && CollectionUtils.isNullOrEmpty(files)) {
+      throw new IAE("Either one of baseDir or files should be specified");
 
 Review comment:
   Oops, thanks. I'm not sure why we cannot have both at the same time as long as we don't process the same file more than once. It can be more aligned with the cloud input sources though.. (Also, why do we do this?)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382949285
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
 ##########
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A SplitHintSpec that can create splits of multiple files.
+ * A split created by this class can have one or more input files.
+ * If there is only one file in the split, its size can be larger than {@link #maxSplitSize}.
+ * If there are two or more files in the split, their total size cannot be larger than {@link #maxSplitSize}.
+ */
+public class MaxSizeSplitHintSpec implements SplitHintSpec
 
 Review comment:
   I think you should make spec classes be pure data objects (or beans). Adding methods like split to them makes them complicated and adds logic that makes it hard to version them in the future. We should think of data objects as literals, not as objects with business logic.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r381508140
 
 

 ##########
 File path: docs/ingestion/native-batch.md
 ##########
 @@ -42,11 +42,12 @@ demonstrates the "simple" (single-task) mode.
 ## Parallel task
 
 The Parallel task (type `index_parallel`) is a task for parallel batch indexing. This task only uses Druid's resource and
-doesn't depend on other external systems like Hadoop. `index_parallel` task is a supervisor task which basically creates
-multiple worker tasks and submits them to the Overlord. Each worker task reads input data and creates segments. Once they
-successfully generate segments for all input data, they report the generated segment list to the supervisor task. 
+doesn't depend on other external systems like Hadoop. The `index_parallel` task is a supervisor task which orchestrates
+the whole indexing process. It splits the input data and and issues worker tasks
+to the Overlord which actually process the assigned input split and create segments.
+Once a worker task successfully processes all assigned input split, it reports the generated segment list to the supervisor task. 
 The supervisor task periodically checks the status of worker tasks. If one of them fails, it retries the failed task
-until the number of retries reaches to the configured limit. If all worker tasks succeed, then it publishes the reported segments at once and finalize the ingestion.
+until the number of retries reaches to the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalize the ingestion.
 
 Review comment:
   Light line edit..
   
   ```suggestion
   until the number of retries reaches the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalizes ingestion.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382870157
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java
 ##########
 @@ -48,23 +48,23 @@
   public InputEntityIteratingReader(
       InputRowSchema inputRowSchema,
       InputFormat inputFormat,
-      Stream<InputEntity> sourceStream,
+      Iterator<? extends InputEntity> sourceStream,
       File temporaryDirectory
   )
   {
-    this(inputRowSchema, inputFormat, CloseableIterators.withEmptyBaggage(sourceStream.iterator()), temporaryDirectory);
+    this(inputRowSchema, inputFormat, CloseableIterators.withEmptyBaggage(sourceStream), temporaryDirectory);
   }
 
   public InputEntityIteratingReader(
       InputRowSchema inputRowSchema,
       InputFormat inputFormat,
-      CloseableIterator<InputEntity> sourceIterator,
+      CloseableIterator<? extends InputEntity> sourceIterator,
 
 Review comment:
   nit: could call this `sourceCloseableIterator`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479988
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java
 ##########
 @@ -56,6 +63,35 @@ public String getSegmentId()
   @JsonProperty
   public List<Interval> getIntervals()
   {
-    return intervals;
+    return Collections.unmodifiableList(intervals);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    WindowedSegmentId segmentId1 = (WindowedSegmentId) o;
+    return Objects.equals(segmentId, segmentId1.segmentId) &&
+           Objects.equals(intervals, segmentId1.intervals);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(segmentId, intervals);
+  }
 
 Review comment:
   Added.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479658
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
 ##########
 @@ -21,40 +21,64 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOCase;
+import org.apache.commons.io.filefilter.AndFileFilter;
+import org.apache.commons.io.filefilter.IOFileFilter;
+import org.apache.commons.io.filefilter.NameFileFilter;
+import org.apache.commons.io.filefilter.NotFileFilter;
 import org.apache.commons.io.filefilter.TrueFileFilter;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.InputFileAttribute;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
+import org.apache.druid.utils.Streams;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
-import java.util.Spliterator;
-import java.util.Spliterators;
+import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
 
-public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<File>
+public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<List<File>>
 {
   private final File baseDir;
   private final String filter;
+  private final Set<File> files;
 
   @JsonCreator
   public LocalInputSource(
       @JsonProperty("baseDir") File baseDir,
-      @JsonProperty("filter") String filter
+      @JsonProperty("filter") String filter,
+      @JsonProperty("files") Set<File> files
 
 Review comment:
   Added.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479842
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
 ##########
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A SplitHintSpec that can create splits of multiple files.
+ * A split created by this class can have one or more input files.
+ * If there is only one file in the split, its size can be larger than {@link #maxSplitSize}.
+ * If there are two or more files in the split, their total size cannot be larger than {@link #maxSplitSize}.
+ */
+public class MaxSizeSplitHintSpec implements SplitHintSpec
 
 Review comment:
   Good point. I agree it is a better structure, but the problem is there are too many classes doing this kind of things especially on the ingestion side. I don't think it's possible to apply the suggested design to all classes anytime soon. Also, I think it's better to promote SQL for ingestion as well so that Druid users don't have to worry about the API changes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479917
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
 ##########
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A SplitHintSpec that can create splits of multiple files.
+ * A split created by this class can have one or more input files.
+ * If there is only one file in the split, its size can be larger than {@link #maxSplitSize}.
+ * If there are two or more files in the split, their total size cannot be larger than {@link #maxSplitSize}.
+ */
+public class MaxSizeSplitHintSpec implements SplitHintSpec
+{
+  public static final String TYPE = "maxSize";
+
+  @VisibleForTesting
+  static final long DEFAULT_MAX_SPLIT_SIZE = 512 * 1024 * 1024;
+
+  private final long maxSplitSize;
+
+  @JsonCreator
+  public MaxSizeSplitHintSpec(@JsonProperty("maxSplitSize") @Nullable Long maxSplitSize)
+  {
+    this.maxSplitSize = maxSplitSize == null ? DEFAULT_MAX_SPLIT_SIZE : maxSplitSize;
+  }
+
+  @JsonProperty
+  public long getMaxSplitSize()
+  {
+    return maxSplitSize;
+  }
+
+  @Override
+  public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
+  {
+    return new Iterator<List<T>>()
+    {
+      private T peeking;
+
+      @Override
+      public boolean hasNext()
+      {
+        return peeking != null || inputIterator.hasNext();
+      }
+
+      @Override
+      public List<T> next()
+      {
+        final List<T> current = new ArrayList<>();
+        long splitSize = 0;
+        while (splitSize < maxSplitSize && (peeking != null || inputIterator.hasNext())) {
+          if (peeking == null) {
+            peeking = inputIterator.next();
+          }
+          final long size = inputAttributeExtractor.apply(peeking).getSize();
+          if (current.isEmpty()) {
+            current.add(peeking);
+            splitSize += size;
+            peeking = null;
+          } else if (splitSize + size < maxSplitSize) {
+            current.add(peeking);
+            splitSize += size;
+            peeking = null;
+          } else {
+            break;
+          }
+        }
+        assert !current.isEmpty();
+        return current;
+      }
+    };
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    MaxSizeSplitHintSpec that = (MaxSizeSplitHintSpec) o;
+    return maxSplitSize == that.maxSplitSize;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(maxSplitSize);
+  }
+}
 
 Review comment:
   Added.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382871722
 
 

 ##########
 File path: extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
 ##########
 @@ -59,23 +65,42 @@ public GoogleCloudStorageInputSource(
   }
 
   @Override
-  protected GoogleCloudStorageEntity createEntity(InputSplit<CloudObjectLocation> split)
+  protected InputEntity createEntity(CloudObjectLocation location)
   {
-    return new GoogleCloudStorageEntity(storage, split.get());
+    return new GoogleCloudStorageEntity(storage, location);
   }
 
   @Override
-  protected Stream<InputSplit<CloudObjectLocation>> getPrefixesSplitStream()
+  protected Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
   {
-    return StreamSupport.stream(storageObjectIterable().spliterator(), false)
-                        .map(this::byteSourceFromStorageObject)
-                        .map(InputSplit::new);
+    final Iterator<List<StorageObject>> splitIterator = splitHintSpec.split(
+        storageObjectIterable().iterator(),
+        storageObject -> {
+          final BigInteger sizeInBigInteger = storageObject.getSize();
+          long sizeInLong;
+          if (sizeInBigInteger == null) {
+            sizeInLong = Long.MAX_VALUE;
+          } else {
+            try {
+              sizeInLong = sizeInBigInteger.longValueExact();
+            }
+            catch (ArithmeticException e) {
+              sizeInLong = Long.MAX_VALUE;
 
 Review comment:
   Should this propagate the exception instead? If we get an object with a byte size that can't be stored in a long, something seems very wrong

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r381499664
 
 

 ##########
 File path: docs/ingestion/native-batch.md
 ##########
 @@ -42,11 +42,12 @@ demonstrates the "simple" (single-task) mode.
 ## Parallel task
 
 The Parallel task (type `index_parallel`) is a task for parallel batch indexing. This task only uses Druid's resource and
-doesn't depend on other external systems like Hadoop. `index_parallel` task is a supervisor task which basically creates
-multiple worker tasks and submits them to the Overlord. Each worker task reads input data and creates segments. Once they
-successfully generate segments for all input data, they report the generated segment list to the supervisor task. 
+doesn't depend on other external systems like Hadoop. The `index_parallel` task is a supervisor task which orchestrates
+the whole indexing process. It splits the input data and and issues worker tasks
 
 Review comment:
   "and and" -> "and"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382949821
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
 ##########
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A SplitHintSpec that can create splits of multiple files.
+ * A split created by this class can have one or more input files.
+ * If there is only one file in the split, its size can be larger than {@link #maxSplitSize}.
+ * If there are two or more files in the split, their total size cannot be larger than {@link #maxSplitSize}.
+ */
+public class MaxSizeSplitHintSpec implements SplitHintSpec
+{
+  public static final String TYPE = "maxSize";
+
+  @VisibleForTesting
+  static final long DEFAULT_MAX_SPLIT_SIZE = 512 * 1024 * 1024;
+
+  private final long maxSplitSize;
+
+  @JsonCreator
+  public MaxSizeSplitHintSpec(@JsonProperty("maxSplitSize") @Nullable Long maxSplitSize)
+  {
+    this.maxSplitSize = maxSplitSize == null ? DEFAULT_MAX_SPLIT_SIZE : maxSplitSize;
+  }
+
+  @JsonProperty
+  public long getMaxSplitSize()
+  {
+    return maxSplitSize;
+  }
+
+  @Override
+  public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
+  {
+    return new Iterator<List<T>>()
+    {
+      private T peeking;
 
 Review comment:
   I think you can simplify the logic of the next method below if you initialize peeking to inputIterator.next(), and only set peeking to null when inputIterator.hasNext() is false. In your next() below, you would just keeping shifting values from inputIterator into current after each iteration as long as there are more inputs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei merged pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jon-wei merged pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r381506006
 
 

 ##########
 File path: docs/ingestion/native-batch.md
 ##########
 @@ -42,11 +42,12 @@ demonstrates the "simple" (single-task) mode.
 ## Parallel task
 
 The Parallel task (type `index_parallel`) is a task for parallel batch indexing. This task only uses Druid's resource and
-doesn't depend on other external systems like Hadoop. `index_parallel` task is a supervisor task which basically creates
-multiple worker tasks and submits them to the Overlord. Each worker task reads input data and creates segments. Once they
-successfully generate segments for all input data, they report the generated segment list to the supervisor task. 
+doesn't depend on other external systems like Hadoop. The `index_parallel` task is a supervisor task which orchestrates
+the whole indexing process. It splits the input data and and issues worker tasks
+to the Overlord which actually process the assigned input split and create segments.
+Once a worker task successfully processes all assigned input split, it reports the generated segment list to the supervisor task. 
 
 Review comment:
   It’s a little unclear to me who's doing what in this. Is the following accurate/clearer?
   
   “The `index_parallel` task is a supervisor task that orchestrates the indexing process. The task splits input data for processing by Overlord worker tasks, which process the input splits assigned to them and create segments from the input. Once a worker task successfully processes all assigned input splits, it reports the generated segment list to the supervisor task.”
    
   If not, for a lighter edit, maybe just clarify that it's the worker tasks more specifically, rather than the overlord, that is processing input splits (if that's the case). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382259194
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
 ##########
 @@ -34,28 +39,46 @@
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
 import org.apache.druid.utils.Streams;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<List<File>>
 {
   private final File baseDir;
   private final String filter;
+  private final Set<File> files;
 
   @JsonCreator
   public LocalInputSource(
       @JsonProperty("baseDir") File baseDir,
-      @JsonProperty("filter") String filter
+      @JsonProperty("filter") String filter,
+      @JsonProperty("files") Set<File> files
   )
   {
-    this.baseDir = Preconditions.checkNotNull(baseDir, "baseDir");
-    this.filter = Preconditions.checkNotNull(filter, "filter");
+    this.baseDir = baseDir;
+    this.filter = baseDir != null ? Preconditions.checkNotNull(filter, "filter") : filter;
+    this.files = files;
+
+    if (baseDir == null && CollectionUtils.isNullOrEmpty(files)) {
+      throw new IAE("Either one of baseDir or files should be specified");
 
 Review comment:
   Yeah, I think actually it probably would be better to allow both `uris` and `prefixes` in the cloud file input sources and any others that match this pattern, not sure why we do only one or the other currently..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479611
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java
 ##########
 @@ -48,23 +48,23 @@
   public InputEntityIteratingReader(
       InputRowSchema inputRowSchema,
       InputFormat inputFormat,
-      Stream<InputEntity> sourceStream,
+      Iterator<? extends InputEntity> sourceStream,
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479972
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
 ##########
 @@ -131,14 +197,15 @@ public boolean equals(Object o)
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    LocalInputSource source = (LocalInputSource) o;
-    return Objects.equals(baseDir, source.baseDir) &&
-           Objects.equals(filter, source.filter);
+    LocalInputSource that = (LocalInputSource) o;
+    return Objects.equals(baseDir, that.baseDir) &&
+           Objects.equals(filter, that.filter) &&
+           Objects.equals(files, that.files);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(baseDir, filter);
+    return Objects.hash(baseDir, filter, files);
   }
 
 Review comment:
   Added.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r381499664
 
 

 ##########
 File path: docs/ingestion/native-batch.md
 ##########
 @@ -42,11 +42,12 @@ demonstrates the "simple" (single-task) mode.
 ## Parallel task
 
 The Parallel task (type `index_parallel`) is a task for parallel batch indexing. This task only uses Druid's resource and
-doesn't depend on other external systems like Hadoop. `index_parallel` task is a supervisor task which basically creates
-multiple worker tasks and submits them to the Overlord. Each worker task reads input data and creates segments. Once they
-successfully generate segments for all input data, they report the generated segment list to the supervisor task. 
+doesn't depend on other external systems like Hadoop. The `index_parallel` task is a supervisor task which orchestrates
+the whole indexing process. It splits the input data and and issues worker tasks
 
 Review comment:
   Extra "and"
   ```suggestion
   the whole indexing process. It splits the input data and issues worker tasks
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382869968
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
 ##########
 @@ -21,40 +21,64 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOCase;
+import org.apache.commons.io.filefilter.AndFileFilter;
+import org.apache.commons.io.filefilter.IOFileFilter;
+import org.apache.commons.io.filefilter.NameFileFilter;
+import org.apache.commons.io.filefilter.NotFileFilter;
 import org.apache.commons.io.filefilter.TrueFileFilter;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.InputFileAttribute;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
+import org.apache.druid.utils.Streams;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
-import java.util.Spliterator;
-import java.util.Spliterators;
+import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
 
-public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<File>
+public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<List<File>>
 {
   private final File baseDir;
   private final String filter;
+  private final Set<File> files;
 
   @JsonCreator
   public LocalInputSource(
       @JsonProperty("baseDir") File baseDir,
-      @JsonProperty("filter") String filter
+      @JsonProperty("filter") String filter,
+      @JsonProperty("files") Set<File> files
 
 Review comment:
   Can you add this new property to the LocalInputSource property docs?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479887
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
 ##########
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A SplitHintSpec that can create splits of multiple files.
+ * A split created by this class can have one or more input files.
+ * If there is only one file in the split, its size can be larger than {@link #maxSplitSize}.
+ * If there are two or more files in the split, their total size cannot be larger than {@link #maxSplitSize}.
+ */
+public class MaxSizeSplitHintSpec implements SplitHintSpec
+{
+  public static final String TYPE = "maxSize";
+
+  @VisibleForTesting
+  static final long DEFAULT_MAX_SPLIT_SIZE = 512 * 1024 * 1024;
+
+  private final long maxSplitSize;
+
+  @JsonCreator
+  public MaxSizeSplitHintSpec(@JsonProperty("maxSplitSize") @Nullable Long maxSplitSize)
+  {
+    this.maxSplitSize = maxSplitSize == null ? DEFAULT_MAX_SPLIT_SIZE : maxSplitSize;
+  }
+
+  @JsonProperty
+  public long getMaxSplitSize()
+  {
+    return maxSplitSize;
+  }
+
+  @Override
+  public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
+  {
+    return new Iterator<List<T>>()
+    {
+      private T peeking;
 
 Review comment:
   I don't understand how it works. `peeking` is to keep the last fetched input from the underlying iterator because it can be returned or not based on the total size of inputs in the `current` list. If the last fetched input was not added, it should be returned in the following `next()` call.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382949983
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java
 ##########
 @@ -56,6 +57,12 @@ public long getMaxInputSegmentBytesPerTask()
     return maxInputSegmentBytesPerTask;
   }
 
+  @Override
+  public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
 
 Review comment:
   Seems like this method really doesn't belong here if not all subclasses or implementation need it? Or should this class be abstract instead?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r381593900
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/SpecificFilesLocalInputSource.java
 ##########
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.InputFileAttribute;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.utils.CollectionUtils;
+import org.apache.druid.utils.Streams;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+public class SpecificFilesLocalInputSource extends AbstractInputSource implements SplittableInputSource<List<File>>
 
 Review comment:
   If it isn't too much trouble, it seems like this would be better to just be a part of `LocalInputSource` to be more consistent with the cloud file input sources, rather than introducing a new type. Though if it is needlessly complicated then is probably fine as is.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479567
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
 ##########
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A SplitHintSpec that can create splits of multiple files.
+ * A split created by this class can have one or more input files.
+ * If there is only one file in the split, its size can be larger than {@link #maxSplitSize}.
+ * If there are two or more files in the split, their total size cannot be larger than {@link #maxSplitSize}.
+ */
+public class MaxSizeSplitHintSpec implements SplitHintSpec
+{
+  public static final String TYPE = "maxSize";
+
+  @VisibleForTesting
+  static final long DEFAULT_MAX_SPLIT_SIZE = 512 * 1024 * 1024;
+
+  private final long maxSplitSize;
+
+  @JsonCreator
+  public MaxSizeSplitHintSpec(@JsonProperty("maxSplitSize") @Nullable Long maxSplitSize)
+  {
+    this.maxSplitSize = maxSplitSize == null ? DEFAULT_MAX_SPLIT_SIZE : maxSplitSize;
+  }
+
+  @JsonProperty
+  public long getMaxSplitSize()
+  {
+    return maxSplitSize;
+  }
+
+  @Override
+  public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
+  {
+    return new Iterator<List<T>>()
+    {
+      private T peeking;
+
+      @Override
+      public boolean hasNext()
+      {
+        return peeking != null || inputIterator.hasNext();
+      }
+
+      @Override
+      public List<T> next()
+      {
+        final List<T> current = new ArrayList<>();
+        long splitSize = 0;
+        while (splitSize < maxSplitSize && (peeking != null || inputIterator.hasNext())) {
+          if (peeking == null) {
+            peeking = inputIterator.next();
+          }
+          final long size = inputAttributeExtractor.apply(peeking).getSize();
+          if (current.isEmpty()) {
+            current.add(peeking);
+            splitSize += size;
+            peeking = null;
+          } else if (splitSize + size < maxSplitSize) {
 
 Review comment:
   Ah good catch. Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479632
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java
 ##########
 @@ -48,23 +48,23 @@
   public InputEntityIteratingReader(
       InputRowSchema inputRowSchema,
       InputFormat inputFormat,
-      Stream<InputEntity> sourceStream,
+      Iterator<? extends InputEntity> sourceStream,
       File temporaryDirectory
   )
   {
-    this(inputRowSchema, inputFormat, CloseableIterators.withEmptyBaggage(sourceStream.iterator()), temporaryDirectory);
+    this(inputRowSchema, inputFormat, CloseableIterators.withEmptyBaggage(sourceStream), temporaryDirectory);
   }
 
   public InputEntityIteratingReader(
       InputRowSchema inputRowSchema,
       InputFormat inputFormat,
-      CloseableIterator<InputEntity> sourceIterator,
+      CloseableIterator<? extends InputEntity> sourceIterator,
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479776
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
 ##########
 @@ -228,13 +232,15 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu
     // segmentIds is supposed to be specified by the supervisor task during the parallel indexing.
     // If it's not null, segments are already split by the supervisor task and further split won't happen.
     if (segmentIds == null) {
-      return createSplits(
-          coordinatorClient,
-          retryPolicyFactory,
-          dataSource,
-          interval,
-          splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec
-      ).stream();
+      return Streams.sequentialStreamFrom(
+          createSplits(
+              coordinatorClient,
+              retryPolicyFactory,
+              dataSource,
+              interval,
+              splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec
 
 Review comment:
   Changed to create `MaxSizeSplitHintSpec` directly.
   
   > Does this also mean SegmentsSplitHintSpec is deprecated?
   
   Good question. `MaxSizeSplitHintSpec` and `SegmentsSplitHintSpec` work exactly same for now, but I think `SegmentsSplitHintSpec` can be further optimized in the future. Added some comment about the future improvement.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382950480
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java
 ##########
 @@ -56,6 +63,35 @@ public String getSegmentId()
   @JsonProperty
   public List<Interval> getIntervals()
   {
-    return intervals;
+    return Collections.unmodifiableList(intervals);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    WindowedSegmentId segmentId1 = (WindowedSegmentId) o;
+    return Objects.equals(segmentId, segmentId1.segmentId) &&
+           Objects.equals(intervals, segmentId1.intervals);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(segmentId, intervals);
+  }
 
 Review comment:
   tests for equals and hashcode please.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r381829041
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
 ##########
 @@ -34,28 +39,46 @@
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CollectionUtils;
 import org.apache.druid.utils.Streams;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class LocalInputSource extends AbstractInputSource implements SplittableInputSource<List<File>>
 {
   private final File baseDir;
   private final String filter;
+  private final Set<File> files;
 
   @JsonCreator
   public LocalInputSource(
       @JsonProperty("baseDir") File baseDir,
-      @JsonProperty("filter") String filter
+      @JsonProperty("filter") String filter,
+      @JsonProperty("files") Set<File> files
   )
   {
-    this.baseDir = Preconditions.checkNotNull(baseDir, "baseDir");
-    this.filter = Preconditions.checkNotNull(filter, "filter");
+    this.baseDir = baseDir;
+    this.filter = baseDir != null ? Preconditions.checkNotNull(filter, "filter") : filter;
+    this.files = files;
+
+    if (baseDir == null && CollectionUtils.isNullOrEmpty(files)) {
+      throw new IAE("Either one of baseDir or files should be specified");
 
 Review comment:
   Is this better to accept both baseDir + filter and explicit files list, or should you specify one or the other exclusively?
   
   If you think accepting both is better then this exception message should probably say 'At least one of ...' instead of 'Either one of'.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382233119
 
 

 ##########
 File path: docs/ingestion/native-batch.md
 ##########
 @@ -42,11 +42,12 @@ demonstrates the "simple" (single-task) mode.
 ## Parallel task
 
 The Parallel task (type `index_parallel`) is a task for parallel batch indexing. This task only uses Druid's resource and
-doesn't depend on other external systems like Hadoop. `index_parallel` task is a supervisor task which basically creates
-multiple worker tasks and submits them to the Overlord. Each worker task reads input data and creates segments. Once they
-successfully generate segments for all input data, they report the generated segment list to the supervisor task. 
+doesn't depend on other external systems like Hadoop. The `index_parallel` task is a supervisor task which orchestrates
+the whole indexing process. It splits the input data and and issues worker tasks
+to the Overlord which actually process the assigned input split and create segments.
+Once a worker task successfully processes all assigned input split, it reports the generated segment list to the supervisor task. 
 
 Review comment:
   Thanks for taking a look! 
   
   > If not, for a lighter edit, maybe just clarify that it's the worker tasks more specifically, rather than the overlord, that is processing input splits (if that's the case).
   
   This is correct. I tried to make it more clear.
   
   ```
   The Parallel task (type `index_parallel`) is a task for parallel batch indexing. This task only uses Druid’s resource and
   doesn’t depend on other external systems like Hadoop. The `index_parallel` task is a supervisor task that orchestrates
   the whole indexing process. The supervisor task splits the input data and creates worker tasks to process those splits.
   The created worker tasks are issued to the Overlord so that they can be scheduled and run on MiddleManagers or Indexers.
   Once a worker task successfully processes the assigned input split, it reports the generated segment list to the supervisor task.
   The supervisor task periodically checks the status of worker tasks. If one of them fails, it retries the failed task
   until the number of retries reaches the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalizes ingestion.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382872400
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
 ##########
 @@ -228,13 +232,15 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu
     // segmentIds is supposed to be specified by the supervisor task during the parallel indexing.
     // If it's not null, segments are already split by the supervisor task and further split won't happen.
     if (segmentIds == null) {
-      return createSplits(
-          coordinatorClient,
-          retryPolicyFactory,
-          dataSource,
-          interval,
-          splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec
-      ).stream();
+      return Streams.sequentialStreamFrom(
+          createSplits(
+              coordinatorClient,
+              retryPolicyFactory,
+              dataSource,
+              interval,
+              splitHintSpec == null ? new SegmentsSplitHintSpec(null) : splitHintSpec
 
 Review comment:
   Since it would get converted into a MaxSizeSplitHintSpec in createSplit, could this create a MaxSizeSplitHintSpec directly? (Does this also mean SegmentsSplitHintSpec is deprecated?)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r383479956
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java
 ##########
 @@ -56,6 +57,12 @@ public long getMaxInputSegmentBytesPerTask()
     return maxInputSegmentBytesPerTask;
   }
 
+  @Override
+  public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
 
 Review comment:
   Added comment about it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r381508140
 
 

 ##########
 File path: docs/ingestion/native-batch.md
 ##########
 @@ -42,11 +42,12 @@ demonstrates the "simple" (single-task) mode.
 ## Parallel task
 
 The Parallel task (type `index_parallel`) is a task for parallel batch indexing. This task only uses Druid's resource and
-doesn't depend on other external systems like Hadoop. `index_parallel` task is a supervisor task which basically creates
-multiple worker tasks and submits them to the Overlord. Each worker task reads input data and creates segments. Once they
-successfully generate segments for all input data, they report the generated segment list to the supervisor task. 
+doesn't depend on other external systems like Hadoop. The `index_parallel` task is a supervisor task which orchestrates
+the whole indexing process. It splits the input data and and issues worker tasks
+to the Overlord which actually process the assigned input split and create segments.
+Once a worker task successfully processes all assigned input split, it reports the generated segment list to the supervisor task. 
 The supervisor task periodically checks the status of worker tasks. If one of them fails, it retries the failed task
-until the number of retries reaches to the configured limit. If all worker tasks succeed, then it publishes the reported segments at once and finalize the ingestion.
+until the number of retries reaches to the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalize the ingestion.
 
 Review comment:
   light edit: "...until the number of retries reaches the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalizes the ingestion."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382870128
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java
 ##########
 @@ -48,23 +48,23 @@
   public InputEntityIteratingReader(
       InputRowSchema inputRowSchema,
       InputFormat inputFormat,
-      Stream<InputEntity> sourceStream,
+      Iterator<? extends InputEntity> sourceStream,
 
 Review comment:
   nit: could call this `sourceIterator`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382950270
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
 ##########
 @@ -131,14 +197,15 @@ public boolean equals(Object o)
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    LocalInputSource source = (LocalInputSource) o;
-    return Objects.equals(baseDir, source.baseDir) &&
-           Objects.equals(filter, source.filter);
+    LocalInputSource that = (LocalInputSource) o;
+    return Objects.equals(baseDir, that.baseDir) &&
+           Objects.equals(filter, that.filter) &&
+           Objects.equals(files, that.files);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(baseDir, filter);
+    return Objects.hash(baseDir, filter, files);
   }
 
 Review comment:
   equals and hashCode need unit tests for maintainability.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
sthetland commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r381510488
 
 

 ##########
 File path: docs/ingestion/native-batch.md
 ##########
 @@ -226,7 +224,14 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 `SplitHintSpec` is used to give a hint when the supervisor task creates input splits.
 Note that each worker task processes a single input split. You can control the amount of data each worker task will read during the first phase.
 
-Currently only one splitHintSpec, i.e., `segments`, is available.
+#### `MaxSizeSplitHintSpec`
+
+`MaxSizeSplitHintSpec` is respected by all splittable input sources except for the HTTP input source.
+
+|property|description|default|required?|
+|--------|-----------|-------|---------|
+|type|This should always be `maxSize`.|none|yes|
+|maxSplitSize|Maximum number of bytes of input files to process in a single task. If a single file is larger than this number, it will be processed by itself in a single task (splitting a large file is not supported yet).|500MB|no|
 
 Review comment:
   Could this match the wording used below, so: 
   "....in a single task. (Files are never split across tasks.)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382860923
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
 ##########
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A SplitHintSpec that can create splits of multiple files.
+ * A split created by this class can have one or more input files.
+ * If there is only one file in the split, its size can be larger than {@link #maxSplitSize}.
+ * If there are two or more files in the split, their total size cannot be larger than {@link #maxSplitSize}.
+ */
+public class MaxSizeSplitHintSpec implements SplitHintSpec
+{
+  public static final String TYPE = "maxSize";
+
+  @VisibleForTesting
+  static final long DEFAULT_MAX_SPLIT_SIZE = 512 * 1024 * 1024;
+
+  private final long maxSplitSize;
+
+  @JsonCreator
+  public MaxSizeSplitHintSpec(@JsonProperty("maxSplitSize") @Nullable Long maxSplitSize)
+  {
+    this.maxSplitSize = maxSplitSize == null ? DEFAULT_MAX_SPLIT_SIZE : maxSplitSize;
+  }
+
+  @JsonProperty
+  public long getMaxSplitSize()
+  {
+    return maxSplitSize;
+  }
+
+  @Override
+  public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
+  {
+    return new Iterator<List<T>>()
+    {
+      private T peeking;
+
+      @Override
+      public boolean hasNext()
+      {
+        return peeking != null || inputIterator.hasNext();
+      }
+
+      @Override
+      public List<T> next()
+      {
+        final List<T> current = new ArrayList<>();
+        long splitSize = 0;
+        while (splitSize < maxSplitSize && (peeking != null || inputIterator.hasNext())) {
+          if (peeking == null) {
+            peeking = inputIterator.next();
+          }
+          final long size = inputAttributeExtractor.apply(peeking).getSize();
+          if (current.isEmpty()) {
+            current.add(peeking);
+            splitSize += size;
+            peeking = null;
+          } else if (splitSize + size < maxSplitSize) {
 
 Review comment:
   Looks like the `splitSize + size < maxSplitSize` and `current.isEmpty()` block can be combined

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing

Posted by GitBox <gi...@apache.org>.
jnaous commented on a change in pull request #9360: Create splits of multiple files for parallel indexing
URL: https://github.com/apache/druid/pull/9360#discussion_r382949842
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/data/input/MaxSizeSplitHintSpec.java
 ##########
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A SplitHintSpec that can create splits of multiple files.
+ * A split created by this class can have one or more input files.
+ * If there is only one file in the split, its size can be larger than {@link #maxSplitSize}.
+ * If there are two or more files in the split, their total size cannot be larger than {@link #maxSplitSize}.
+ */
+public class MaxSizeSplitHintSpec implements SplitHintSpec
+{
+  public static final String TYPE = "maxSize";
+
+  @VisibleForTesting
+  static final long DEFAULT_MAX_SPLIT_SIZE = 512 * 1024 * 1024;
+
+  private final long maxSplitSize;
+
+  @JsonCreator
+  public MaxSizeSplitHintSpec(@JsonProperty("maxSplitSize") @Nullable Long maxSplitSize)
+  {
+    this.maxSplitSize = maxSplitSize == null ? DEFAULT_MAX_SPLIT_SIZE : maxSplitSize;
+  }
+
+  @JsonProperty
+  public long getMaxSplitSize()
+  {
+    return maxSplitSize;
+  }
+
+  @Override
+  public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
+  {
+    return new Iterator<List<T>>()
+    {
+      private T peeking;
+
+      @Override
+      public boolean hasNext()
+      {
+        return peeking != null || inputIterator.hasNext();
+      }
+
+      @Override
+      public List<T> next()
+      {
+        final List<T> current = new ArrayList<>();
+        long splitSize = 0;
+        while (splitSize < maxSplitSize && (peeking != null || inputIterator.hasNext())) {
+          if (peeking == null) {
+            peeking = inputIterator.next();
+          }
+          final long size = inputAttributeExtractor.apply(peeking).getSize();
+          if (current.isEmpty()) {
+            current.add(peeking);
+            splitSize += size;
+            peeking = null;
+          } else if (splitSize + size < maxSplitSize) {
+            current.add(peeking);
+            splitSize += size;
+            peeking = null;
+          } else {
+            break;
+          }
+        }
+        assert !current.isEmpty();
+        return current;
+      }
+    };
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    MaxSizeSplitHintSpec that = (MaxSizeSplitHintSpec) o;
+    return maxSplitSize == that.maxSplitSize;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(maxSplitSize);
+  }
+}
 
 Review comment:
   equals and hashCode need unit tests

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org