You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "paul-rogers (via GitHub)" <gi...@apache.org> on 2023/03/27 20:27:20 UTC

[GitHub] [druid] paul-rogers commented on a diff in pull request #13955: Always use file sizes when determining batch ingest splits

paul-rogers commented on code in PR #13955:
URL: https://github.com/apache/druid/pull/13955#discussion_r1149708335


##########
extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java:
##########
@@ -79,73 +71,63 @@ protected InputEntity createEntity(CloudObjectLocation location)
     return new GoogleCloudStorageEntity(storage, location);
   }
 
-  @Override
-  protected Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
-  {
-    final Iterator<List<StorageObject>> splitIterator = splitHintSpec.split(
-        storageObjectIterable().iterator(),
-        storageObject -> {
-          final BigInteger sizeInBigInteger = storageObject.getSize();
-          long sizeInLong;
-          if (sizeInBigInteger == null) {
-            sizeInLong = Long.MAX_VALUE;
-          } else {
-            try {
-              sizeInLong = sizeInBigInteger.longValueExact();
-            }
-            catch (ArithmeticException e) {
-              LOG.warn(
-                  e,
-                  "The object [%s, %s] has a size [%s] out of the range of the long type. "
-                  + "The max long value will be used for its size instead.",
-                  storageObject.getBucket(),
-                  storageObject.getName(),
-                  sizeInBigInteger
-              );
-              sizeInLong = Long.MAX_VALUE;
-            }
-          }
-          return new InputFileAttribute(sizeInLong);
-        }
-    );
-
-    return Streams.sequentialStreamFrom(splitIterator)
-                  .map(objects -> objects.stream().map(this::byteSourceFromStorageObject).collect(Collectors.toList()))
-                  .map(InputSplit::new);
-  }
-
   @Override
   public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
   {
     return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getObjectGlob());
   }
 
-  private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
+  @Override
+  protected CloudObjectSplitWidget getSplitWidget()
   {
-    return GoogleUtils.objectToCloudObjectLocation(storageObject);
+    class SplitWidget implements CloudObjectSplitWidget
+    {
+      @Override
+      public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> prefixes)
+      {
+        return Iterators.transform(
+            GoogleUtils.lazyFetchingStorageObjectsIterator(
+                storage,
+                prefixes.iterator(),
+                inputDataConfig.getMaxListingLength()
+            ),
+            object -> new LocationWithSize(object.getBucket(), object.getName(), getSize(object))
+        );
+      }
+
+      @Override
+      public long getObjectSize(CloudObjectLocation location) throws IOException
+      {
+        final StorageObject storageObject = storage.getMetadata(location.getBucket(), location.getPath());
+        return getSize(storageObject);
+      }
+    }
+
+    return new SplitWidget();
   }
 
-  private Iterable<StorageObject> storageObjectIterable()
+  private static long getSize(final StorageObject object)
   {
-    return () -> {
-      Iterator<StorageObject> iterator = GoogleUtils.lazyFetchingStorageObjectsIterator(
-          storage,
-          getPrefixes().iterator(),
-          inputDataConfig.getMaxListingLength()
-      );
+    final BigInteger sizeInBigInteger = object.getSize();
 
-      // Skip files that didn't match glob filter.
-      if (StringUtils.isNotBlank(getObjectGlob())) {
-        PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
-
-        iterator = Iterators.filter(
-            iterator,
-            object -> m.matches(Paths.get(object.getName()))
+    if (sizeInBigInteger == null) {
+      return Long.MAX_VALUE;

Review Comment:
   Given how the size is now used, is this the right answer? A naive splitter will create a huge number of splits for a file of size 2^63. Should we use a special value of, say, -1 (or `null`) to say that the size is unknown? If the size is unknown, how should it be split? Just one task and hope for the best? Under what circumstances would the size be unknown?



##########
extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java:
##########
@@ -66,6 +67,11 @@ public InputStream get(final String bucket, final String path, long start) throw
     return inputStream;
   }
 
+  public StorageObject getMetadata(final String bucket, final String path) throws IOException
+  {
+    return storage.get().objects().get(bucket, path).execute();

Review Comment:
   Other paths retry. Should this one? Or, is the retry built in somewhere? (Or, is Google so reliable that a retry isn't needed? What about internal network glitches?)
   
   What happens if the object does not exist? Is the intent to fail the ingest? Or, skip missing files? If skip, how will the user know which files were skipped?



##########
processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectSplitWidget.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Helper used by {@link CloudObjectInputSource} to implement {@link SplittableInputSource#createSplits}.
+ */
+public interface CloudObjectSplitWidget
+{
+  long UNKNOWN_SIZE = -1;

Review Comment:
   Looks like we do have an unknown size. Yet, in Google, we use the max long value when the size is unknown. Do we have a consistent story?



##########
extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java:
##########
@@ -318,6 +324,13 @@ public void testSerdeWithOtherOtherInvalidArgs()
   @Test
   public void testWithUrisSplit()
   {
+    EasyMock.reset(OSSCLIENT);

Review Comment:
   How would we test this to make sure it actually works against Alibaba Object Storage Service? We know that it works against mocks. Does it work against the real service?
   
   There is no test here to limit the number of items returned. Should there be?



##########
extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java:
##########
@@ -79,73 +71,63 @@ protected InputEntity createEntity(CloudObjectLocation location)
     return new GoogleCloudStorageEntity(storage, location);
   }
 
-  @Override
-  protected Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
-  {
-    final Iterator<List<StorageObject>> splitIterator = splitHintSpec.split(
-        storageObjectIterable().iterator(),
-        storageObject -> {
-          final BigInteger sizeInBigInteger = storageObject.getSize();
-          long sizeInLong;
-          if (sizeInBigInteger == null) {
-            sizeInLong = Long.MAX_VALUE;
-          } else {
-            try {
-              sizeInLong = sizeInBigInteger.longValueExact();
-            }
-            catch (ArithmeticException e) {
-              LOG.warn(
-                  e,
-                  "The object [%s, %s] has a size [%s] out of the range of the long type. "
-                  + "The max long value will be used for its size instead.",
-                  storageObject.getBucket(),
-                  storageObject.getName(),
-                  sizeInBigInteger
-              );
-              sizeInLong = Long.MAX_VALUE;
-            }
-          }
-          return new InputFileAttribute(sizeInLong);
-        }
-    );
-
-    return Streams.sequentialStreamFrom(splitIterator)
-                  .map(objects -> objects.stream().map(this::byteSourceFromStorageObject).collect(Collectors.toList()))
-                  .map(InputSplit::new);
-  }
-
   @Override
   public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
   {
     return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getObjectGlob());
   }
 
-  private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
+  @Override
+  protected CloudObjectSplitWidget getSplitWidget()
   {
-    return GoogleUtils.objectToCloudObjectLocation(storageObject);
+    class SplitWidget implements CloudObjectSplitWidget
+    {
+      @Override
+      public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> prefixes)
+      {
+        return Iterators.transform(
+            GoogleUtils.lazyFetchingStorageObjectsIterator(
+                storage,
+                prefixes.iterator(),
+                inputDataConfig.getMaxListingLength()

Review Comment:
   This item places a limit on the number of objects returned? During an MSQ ingestion, how would I know which files were truncated? This config is an instance of something that helps the _implementation_, but creates a mess for _users_. If I have to ingest 100K files, then I have to ingest that many: doesn't help if the system decides to call it quits at 10K and not tell me or provide a way to resume with the others.
   
   Instead, should we count the files at plan time and fail the query if the number is too large? What would the user do in that case? How could the user partition the input to fit within the limit?
   
   If, on the other hand, the default limit is infinite, how does setting a lower limit help the user who finds they have to adjust the value?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java:
##########
@@ -59,91 +55,193 @@ public boolean canSliceDynamic(InputSpec inputSpec)
   public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
   {
     final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
-    final InputSource inputSource = externalInputSpec.getInputSource();
-    final InputFormat inputFormat = externalInputSpec.getInputFormat();
-    final RowSignature signature = externalInputSpec.getSignature();
-
-    // Worker number -> input source for that worker.
-    final List<List<InputSource>> workerInputSourcess;
-
-    // Figure out input splits for each worker.
-    if (inputSource.isSplittable()) {
-      //noinspection unchecked
-      final SplittableInputSource<Object> splittableInputSource = (SplittableInputSource<Object>) inputSource;
-
-      try {
-        workerInputSourcess = SlicerUtils.makeSlices(
-            splittableInputSource.createSplits(inputFormat, FilePerSplitHintSpec.INSTANCE)
-                                 .map(splittableInputSource::withSplit)
-                                 .iterator(),
-            maxNumSlices
-        );
-      }
-      catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      workerInputSourcess = Collections.singletonList(Collections.singletonList(inputSource));
-    }
 
-    // Sanity check. It is a bug in this method if this exception is ever thrown.
-    if (workerInputSourcess.size() > maxNumSlices) {
-      throw new ISE("Generated too many slices [%d > %d]", workerInputSourcess.size(), maxNumSlices);
+    if (externalInputSpec.getInputSource().isSplittable()) {
+      return sliceSplittableInputSource(
+          externalInputSpec,
+          new StaticSplitHintSpec(maxNumSlices),
+          maxNumSlices
+      );
+    } else {
+      return sliceUnsplittableInputSource(externalInputSpec);
     }
-
-    return IntStream.range(0, maxNumSlices)
-                    .mapToObj(
-                        workerNumber -> {
-                          final List<InputSource> workerInputSources;
-
-                          if (workerNumber < workerInputSourcess.size()) {
-                            workerInputSources = workerInputSourcess.get(workerNumber);
-                          } else {
-                            workerInputSources = Collections.emptyList();
-                          }
-
-                          if (workerInputSources.isEmpty()) {
-                            return NilInputSlice.INSTANCE;
-                          } else {
-                            return new ExternalInputSlice(workerInputSources, inputFormat, signature);
-                          }
-                        }
-                    )
-                    .collect(Collectors.toList());
   }
 
   @Override
   public List<InputSlice> sliceDynamic(
-      InputSpec inputSpec,
-      int maxNumSlices,
-      int maxFilesPerSlice,
-      long maxBytesPerSlice
+      final InputSpec inputSpec,
+      final int maxNumSlices,
+      final int maxFilesPerSlice,
+      final long maxBytesPerSlice
   )
   {
     final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
 
-    if (!externalInputSpec.getInputSource().isSplittable()) {
-      return sliceStatic(inputSpec, 1);
+    if (externalInputSpec.getInputSource().isSplittable()) {
+      return sliceSplittableInputSource(
+          externalInputSpec,
+          new DynamicSplitHintSpec(maxNumSlices, maxFilesPerSlice, maxBytesPerSlice),
+          maxNumSlices
+      );
+    } else {
+      return sliceUnsplittableInputSource(externalInputSpec);
     }
+  }
 
-    final SplittableInputSource<?> inputSource = (SplittableInputSource<?>) externalInputSpec.getInputSource();
-    final MaxSizeSplitHintSpec maxSizeSplitHintSpec = new MaxSizeSplitHintSpec(
-        new HumanReadableBytes(maxBytesPerSlice),
-        maxFilesPerSlice
+  /**
+   * "Slice" an unsplittable input source into a single slice.
+   */
+  private static List<InputSlice> sliceUnsplittableInputSource(final ExternalInputSpec inputSpec)
+  {
+    return Collections.singletonList(
+        new ExternalInputSlice(
+            Collections.singletonList(inputSpec.getInputSource()),
+            inputSpec.getInputFormat(),
+            inputSpec.getSignature()
+        )
     );
+  }
+
+  /**
+   * Slice a {@link SplittableInputSource} using a {@link SplitHintSpec}.
+   */
+  private static List<InputSlice> sliceSplittableInputSource(
+      final ExternalInputSpec inputSpec,
+      final SplitHintSpec splitHintSpec,
+      final int maxNumSlices

Review Comment:
   Not clear how this can be done per input source. The number of slices would seem to be a constraint based on total resources and total file count. Also, shouldn't a split be based on size, not number? For example, in the HDFS days, each split wanted to be the size of a file chunk (typically 256MB or 512MB). For cloud storage, there probably is no ideal size so the size would instead depend on the number of tasks. If we have a single 10 GB file, and 10 tasks, produce 10 slices. If we have a 10 GB file and 1 GB file, with 11 tasks, produce 1 GB slices. And so on.
   
   Perhaps all this is explained in another part of the code not visible in this PR diff...



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java:
##########
@@ -19,14 +19,369 @@
 
 package org.apache.druid.msq.kernel.controller;
 
+import com.google.common.collect.ImmutableMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMaps;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
 import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.NilInputSlice;
+import org.apache.druid.msq.input.SlicerUtils;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
 public class WorkerInputsTest
 {
+  private static final String QUERY_ID = "myQuery";
+
+  @Test
+  public void test_max_threeInputs_fourWorkers()
+  {
+    final StageDefinition stageDef =
+        StageDefinition.builder(0)
+                       .inputs(new TestInputSpec(1, 2, 3))
+                       .maxWorkerCount(4)
+                       .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
+                       .build(QUERY_ID);
+
+    final WorkerInputs inputs = WorkerInputs.create(
+        stageDef,
+        Int2IntMaps.EMPTY_MAP,
+        new TestInputSpecSlicer(true),
+        WorkerAssignmentStrategy.MAX
+    );
+
+    Assert.assertEquals(
+        ImmutableMap.<Integer, List<InputSlice>>builder()
+                    .put(0, Collections.singletonList(new TestInputSlice(1)))
+                    .put(1, Collections.singletonList(new TestInputSlice(2)))
+                    .put(2, Collections.singletonList(new TestInputSlice(3)))
+                    .put(3, Collections.singletonList(new TestInputSlice()))
+                    .build(),
+        inputs.assignmentsMap()
+    );
+  }
+
+  @Test
+  public void test_max_zeroInputs_fourWorkers()
+  {
+    final StageDefinition stageDef =
+        StageDefinition.builder(0)
+                       .inputs(new TestInputSpec())
+                       .maxWorkerCount(4)
+                       .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
+                       .build(QUERY_ID);
+
+    final WorkerInputs inputs = WorkerInputs.create(
+        stageDef,
+        Int2IntMaps.EMPTY_MAP,
+        new TestInputSpecSlicer(true),
+        WorkerAssignmentStrategy.MAX
+    );
+
+    Assert.assertEquals(
+        ImmutableMap.<Integer, List<InputSlice>>builder()
+                    .put(0, Collections.singletonList(new TestInputSlice()))
+                    .put(1, Collections.singletonList(new TestInputSlice()))
+                    .put(2, Collections.singletonList(new TestInputSlice()))
+                    .put(3, Collections.singletonList(new TestInputSlice()))
+                    .build(),
+        inputs.assignmentsMap()
+    );
+  }
+
+  @Test
+  public void test_auto_zeroInputSpecs_fourWorkers()
+  {
+    final StageDefinition stageDef =
+        StageDefinition.builder(0)
+                       .inputs()
+                       .maxWorkerCount(4)
+                       .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
+                       .build(QUERY_ID);
+
+    final WorkerInputs inputs = WorkerInputs.create(
+        stageDef,
+        Int2IntMaps.EMPTY_MAP,
+        new TestInputSpecSlicer(true),
+        WorkerAssignmentStrategy.AUTO
+    );
+
+    Assert.assertEquals(
+        ImmutableMap.<Integer, List<InputSlice>>builder()
+                    .put(0, Collections.singletonList(NilInputSlice.INSTANCE))
+                    .build(),
+        inputs.assignmentsMap()
+    );
+  }
+
+  @Test
+  public void test_auto_zeroInputSlices_fourWorkers()
+  {
+    final StageDefinition stageDef =
+        StageDefinition.builder(0)
+                       .inputs(new TestInputSpec())
+                       .maxWorkerCount(4)
+                       .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
+                       .build(QUERY_ID);
+
+    final WorkerInputs inputs = WorkerInputs.create(
+        stageDef,
+        Int2IntMaps.EMPTY_MAP,
+        new TestInputSpecSlicer(true),
+        WorkerAssignmentStrategy.AUTO
+    );
+
+    Assert.assertEquals(
+        ImmutableMap.<Integer, List<InputSlice>>builder()
+                    .put(0, Collections.singletonList(NilInputSlice.INSTANCE))
+                    .build(),
+        inputs.assignmentsMap()
+    );
+  }
+
+  @Test
+  public void test_auto_zeroInputSlices_broadcast_fourWorkers()
+  {
+    final StageDefinition stageDef =
+        StageDefinition.builder(0)
+                       .inputs(new TestInputSpec())
+                       .broadcastInputs(IntSet.of(0))
+                       .maxWorkerCount(4)
+                       .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
+                       .build(QUERY_ID);
+
+    final WorkerInputs inputs = WorkerInputs.create(
+        stageDef,
+        Int2IntMaps.EMPTY_MAP,
+        new TestInputSpecSlicer(true),
+        WorkerAssignmentStrategy.AUTO
+    );
+
+    Assert.assertEquals(
+        ImmutableMap.<Integer, List<InputSlice>>builder()
+                    .put(0, Collections.singletonList(new TestInputSlice()))
+                    .build(),
+        inputs.assignmentsMap()
+    );
+  }
+
+  @Test
+  public void test_auto_threeInputs_fourWorkers()
+  {
+    final StageDefinition stageDef =
+        StageDefinition.builder(0)
+                       .inputs(new TestInputSpec(1, 2, 3))
+                       .maxWorkerCount(4)
+                       .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
+                       .build(QUERY_ID);
+
+    final WorkerInputs inputs = WorkerInputs.create(
+        stageDef,
+        Int2IntMaps.EMPTY_MAP,
+        new TestInputSpecSlicer(true),
+        WorkerAssignmentStrategy.AUTO
+    );
+
+    Assert.assertEquals(
+        ImmutableMap.<Integer, List<InputSlice>>builder()
+                    .put(0, Collections.singletonList(new TestInputSlice(1, 2, 3)))
+                    .build(),
+        inputs.assignmentsMap()
+    );
+  }
+
+  @Test
+  public void test_auto_threeBigInputs_fourWorkers()
+  {
+    final StageDefinition stageDef =
+        StageDefinition.builder(0)
+                       .inputs(new TestInputSpec(4_000_000_000L, 4_000_000_001L, 4_000_000_002L))
+                       .maxWorkerCount(4)
+                       .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
+                       .build(QUERY_ID);
+
+    final WorkerInputs inputs = WorkerInputs.create(
+        stageDef,
+        Int2IntMaps.EMPTY_MAP,
+        new TestInputSpecSlicer(true),
+        WorkerAssignmentStrategy.AUTO
+    );
+
+    Assert.assertEquals(
+        ImmutableMap.<Integer, List<InputSlice>>builder()
+                    .put(0, Collections.singletonList(new TestInputSlice(4_000_000_000L, 4_000_000_001L)))
+                    .put(1, Collections.singletonList(new TestInputSlice(4_000_000_002L)))
+                    .build(),
+        inputs.assignmentsMap()
+    );
+  }
+
+  @Test
+  public void test_auto_threeBigInputs_oneWorker()

Review Comment:
   Do these tests cover the limits? That is, more splits than the limit? More capacity than available?
   
   Do the tests cover skew: Many small files and one huge one?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/SlicerUtils.java:
##########
@@ -35,7 +35,7 @@
    *
    * Items are assigned round-robin.
    */
-  public static <T> List<List<T>> makeSlices(final Iterator<T> iterator, final int numSlices)
+  public static <T> List<List<T>> makeSlicesStatic(final Iterator<T> iterator, final int numSlices)

Review Comment:
   Dropping this comment here since there is no good location otherwise: we seem to be computing slices in the controller. Does the controller adjust its task ask based on input size? If so, isn't there a negotiation?
   
   * Check the inputs and compute total size & ideal task count.
   * Adjust based on some configured absolute maximum task count.
   * Ask Overlord for that number of workers. Get back an available task count (for non-auto-scale systems.)
   * Revise the work allocation to bin-pack the actual available workers.
   
   Or, is the allocation more static than this, based on configured values somewhere?



##########
extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java:
##########
@@ -79,73 +71,63 @@ protected InputEntity createEntity(CloudObjectLocation location)
     return new GoogleCloudStorageEntity(storage, location);
   }
 
-  @Override
-  protected Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
-  {
-    final Iterator<List<StorageObject>> splitIterator = splitHintSpec.split(
-        storageObjectIterable().iterator(),
-        storageObject -> {
-          final BigInteger sizeInBigInteger = storageObject.getSize();
-          long sizeInLong;
-          if (sizeInBigInteger == null) {
-            sizeInLong = Long.MAX_VALUE;
-          } else {
-            try {
-              sizeInLong = sizeInBigInteger.longValueExact();
-            }
-            catch (ArithmeticException e) {
-              LOG.warn(
-                  e,
-                  "The object [%s, %s] has a size [%s] out of the range of the long type. "
-                  + "The max long value will be used for its size instead.",
-                  storageObject.getBucket(),
-                  storageObject.getName(),
-                  sizeInBigInteger
-              );
-              sizeInLong = Long.MAX_VALUE;
-            }
-          }
-          return new InputFileAttribute(sizeInLong);
-        }
-    );
-
-    return Streams.sequentialStreamFrom(splitIterator)
-                  .map(objects -> objects.stream().map(this::byteSourceFromStorageObject).collect(Collectors.toList()))
-                  .map(InputSplit::new);
-  }
-
   @Override
   public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
   {
     return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getObjectGlob());
   }
 
-  private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
+  @Override
+  protected CloudObjectSplitWidget getSplitWidget()
   {
-    return GoogleUtils.objectToCloudObjectLocation(storageObject);
+    class SplitWidget implements CloudObjectSplitWidget
+    {
+      @Override
+      public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> prefixes)
+      {
+        return Iterators.transform(
+            GoogleUtils.lazyFetchingStorageObjectsIterator(
+                storage,
+                prefixes.iterator(),
+                inputDataConfig.getMaxListingLength()
+            ),
+            object -> new LocationWithSize(object.getBucket(), object.getName(), getSize(object))
+        );
+      }
+
+      @Override
+      public long getObjectSize(CloudObjectLocation location) throws IOException
+      {
+        final StorageObject storageObject = storage.getMetadata(location.getBucket(), location.getPath());
+        return getSize(storageObject);
+      }
+    }
+
+    return new SplitWidget();
   }
 
-  private Iterable<StorageObject> storageObjectIterable()
+  private static long getSize(final StorageObject object)
   {
-    return () -> {
-      Iterator<StorageObject> iterator = GoogleUtils.lazyFetchingStorageObjectsIterator(
-          storage,
-          getPrefixes().iterator(),
-          inputDataConfig.getMaxListingLength()
-      );
+    final BigInteger sizeInBigInteger = object.getSize();
 
-      // Skip files that didn't match glob filter.
-      if (StringUtils.isNotBlank(getObjectGlob())) {
-        PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
-
-        iterator = Iterators.filter(
-            iterator,
-            object -> m.matches(Paths.get(object.getName()))
+    if (sizeInBigInteger == null) {
+      return Long.MAX_VALUE;
+    } else {
+      try {
+        return sizeInBigInteger.longValueExact();
+      }
+      catch (ArithmeticException e) {
+        LOG.warn(
+            e,
+            "The object [%s, %s] has a size [%s] out of the range of the long type. "
+            + "The max long value will be used for its size instead.",
+            object.getBucket(),
+            object.getName(),
+            sizeInBigInteger
         );
+        return Long.MAX_VALUE;

Review Comment:
   Same issue as above. It is hard to believe that any file would be of size 2^63 bytes. It is hard to believe that Druid could handle files of sizes beyond, say, 1TB or so in size. There is probably an upper limit on the size of an object. [Indeed there is](https://cloud.google.com/storage/quotas): "Maximum object size: 5 TiB". Should we enforce a smaller size than that imposed by the size of a `long`? What should we actually do if the object is too large?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/SlicerUtils.java:
##########
@@ -98,6 +98,66 @@ public static <T> List<List<T>> makeSlices(
     return slicesList;
   }
 
+  /**
+   * Creates up to "maxNumSlices" lists from "iterator".
+   *
+   * This method creates as few slices as possible, while keeping each slice under the provided limits.
+   *
+   * If there is a conflict between maxNumSlices and maxFilesPerSlice or maxWeightPerSlice, then maxNumSlices wins.
+   * This means that for small values of maxNumSlices, slices may have more than maxFilesPerSlice files, or more
+   * than maxWeightPerSlice weight.
+   */
+  public static <T> List<List<T>> makeSlicesDynamic(
+      final Iterator<T> iterator,
+      final ToLongFunction<T> weightFunction,
+      final int maxNumSlices,
+      final int maxFilesPerSlice,
+      final long maxWeightPerSlice
+  )
+  {

Review Comment:
   The algorithm here doesn't seem to be quite what one would expect.
   
   We have a priority queue of the split lists. Cool. We fill the first split until it is full, then we start on the next one. It is not clear how if we have n splits with a max of m splits, we end up with n splits if n < m, and n splits of n/m blocks otherwise.
   
   Also, we have a limit on the maximum number of slices. This means that, when the actual data exceeds that number, we need to make slices larger. (Or fail the ingest, or discard splits, or ...)
   
   I guess I would have expected to see a loop that first sums the file sizes to get t. Then, divide that by the target split size to get the ideal number of splits. If that is too large, compute the actual number of splits by dividing the total by the maximum split count.
   
   But, of course, we're ignoring the file count. We can't have any fewer splits than the number of files. So, a first check is to count files. For each, compute the ideal split count. This gives the candidate split count. Then, adjust if the ideal count is too high.
   
   The result will be a set of files, with size and with splits. Now we can do bin packing for tasks based on these values. If the (adjusted) split count is less than the task limit, run one split per task. Else, evenly allocate sizes (not splits) per task so that each processes roughly total size / task count bytes.
   
   It is not entirely clear how the algorithm here approximates what it seems we need. Maybe a few more comments might fill in the gaps? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org