You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/05/23 11:35:51 UTC

[GitHub] [incubator-druid] leventov commented on a change in pull request #4238: Early publishing segments in the middle of data ingestion

leventov commented on a change in pull request #4238: Early publishing segments in the middle of data ingestion
URL: https://github.com/apache/incubator-druid/pull/4238#discussion_r286899300
 
 

 ##########
 File path: indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
 ##########
 @@ -214,61 +216,182 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
     }
   }
 
+  private static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConfig tuningConfig)
+  {
+    Preconditions.checkState(
+        !(tuningConfig.isForceGuaranteedRollup() &&
+          (tuningConfig.isForceExtendableShardSpecs() || ioConfig.isAppendToExisting())),
+        "Perfect rollup cannot be guaranteed with extendable shardSpecs"
+    );
+    return tuningConfig.isForceGuaranteedRollup();
+  }
+
   /**
-   * Determines the number of shards for each interval using a hash of queryGranularity timestamp + all dimensions (i.e
-   * hash-based partitioning). In the future we may want to also support single-dimension partitioning.
+   * Determines intervals and shardSpecs for input data.  This method first checks that it must determine intervals and
+   * shardSpecs by itself.  Intervals must be determined if they are not specified in {@link GranularitySpec}.
+   * ShardSpecs must be determined if the perfect rollup must be guaranteed even though the number of shards is not
+   * specified in {@link IndexTuningConfig}.
+   * <P/>
+   * If both intervals and shardSpecs don't have to be determined, this method simply returns {@link ShardSpecs} for the
+   * given intervals.  Here, if {@link IndexTuningConfig#numShards} is not specified, {@link NumberedShardSpec} is used.
+   * <p/>
+   * If one of intervals or shardSpecs need to be determined, this method reads the entire input for determining one of
+   * them.  If the perfect rollup must be guaranteed, {@link HashBasedNumberedShardSpec} is used for hash partitioning
+   * of input data.  In the future we may want to also support single-dimension partitioning.
+   *
+   * @return generated {@link ShardSpecs} representing a map of intervals and corresponding shard specs
    */
-  private Map<Interval, List<ShardSpec>> determineShardSpecs(
+  private ShardSpecs determineShardSpecs(
       final TaskToolbox toolbox,
       final FirehoseFactory firehoseFactory,
       final File firehoseTempDir
   ) throws IOException
   {
     final ObjectMapper jsonMapper = toolbox.getObjectMapper();
+    final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+    final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
+
     final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
-    final Granularity queryGranularity = granularitySpec.getQueryGranularity();
-    final boolean determineNumPartitions = ingestionSchema.getTuningConfig().getNumShards() == null;
-    final boolean determineIntervals = !ingestionSchema.getDataSchema()
-                                                       .getGranularitySpec()
-                                                       .bucketIntervals()
-                                                       .isPresent();
 
-    final Map<Interval, List<ShardSpec>> shardSpecs = Maps.newHashMap();
+    final boolean determineIntervals = !granularitySpec.bucketIntervals().isPresent();
+    // Guaranteed rollup means that this index task guarantees the 'perfect rollup' across the entire data set.
+    final boolean guaranteedRollup = isGuaranteedRollup(ioConfig, tuningConfig);
+    final boolean determineNumPartitions = tuningConfig.getNumShards() == null && guaranteedRollup;
+    final boolean useExtendableShardSpec = !guaranteedRollup;
 
     // if we were given number of shards per interval and the intervals, we don't need to scan the data
     if (!determineNumPartitions && !determineIntervals) {
-      log.info("numShards and intervals provided, skipping determine partition scan");
-      final SortedSet<Interval> intervals = ingestionSchema.getDataSchema()
-                                                           .getGranularitySpec()
-                                                           .bucketIntervals()
-                                                           .get();
-      final int numShards = ingestionSchema.getTuningConfig().getNumShards();
-
-      for (Interval interval : intervals) {
-        final List<ShardSpec> intervalShardSpecs = Lists.newArrayListWithCapacity(numShards);
-        if (numShards > 1) {
-          for (int i = 0; i < numShards; i++) {
-            intervalShardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper));
-          }
-        } else {
-          intervalShardSpecs.add(NoneShardSpec.instance());
-        }
-        shardSpecs.put(interval, intervalShardSpecs);
+      log.info("Skipping determine partition scan");
+      return createShardSpecWithoutInputScan(jsonMapper, granularitySpec, tuningConfig, useExtendableShardSpec);
+    } else {
+      // determine intervals containing data and prime HLL collectors
+      return createShardSpecsFromInput(
+          jsonMapper,
+          ingestionSchema,
+          firehoseFactory,
+          firehoseTempDir,
+          granularitySpec,
+          tuningConfig,
+          determineIntervals,
+          determineNumPartitions,
+          useExtendableShardSpec
+      );
+    }
+  }
+
+  private static ShardSpecs createShardSpecWithoutInputScan(
+      ObjectMapper jsonMapper,
+      GranularitySpec granularitySpec,
+      IndexTuningConfig tuningConfig,
+      boolean useExtendableShardSpec
+  )
+  {
+    final int numShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
+    final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
+        useExtendableShardSpec,
+        numShards,
+        jsonMapper
+    );
+
+    final Map<Interval, List<ShardSpec>> intervalToShardSpecs = new HashMap<>();
+    for (Interval interval : granularitySpec.bucketIntervals().get()) {
+      final List<ShardSpec> intervalShardSpecs = IntStream.range(0, numShards)
+                                                          .mapToObj(
+                                                              shardId -> shardSpecCreateFn.apply(shardId, numShards)
+                                                          )
+                                                          .collect(Collectors.toList());
+      intervalToShardSpecs.put(interval, intervalShardSpecs);
+    }
+
+    if (useExtendableShardSpec) {
+      return createExtendableShardSpecs(intervalToShardSpecs);
+    } else {
+      return createNonExtendableShardSpecs(intervalToShardSpecs);
+    }
+  }
+
+  private static ShardSpecs createShardSpecsFromInput(
+      ObjectMapper jsonMapper,
+      IndexIngestionSpec ingestionSchema,
+      FirehoseFactory firehoseFactory,
+      File firehoseTempDir,
+      GranularitySpec granularitySpec,
+      IndexTuningConfig tuningConfig,
+      boolean determineIntervals,
+      boolean determineNumPartitions,
+      boolean useExtendableShardSpec
+  ) throws IOException
+  {
+    log.info("Determining intervals and shardSpecs");
+    long determineShardSpecsStartMillis = System.currentTimeMillis();
+
+    final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = collectIntervalsAndShardSpecs(
+        jsonMapper,
+        ingestionSchema,
+        firehoseFactory,
+        firehoseTempDir,
+        granularitySpec,
+        determineIntervals,
+        determineNumPartitions
+    );
+
+    final Map<Interval, List<ShardSpec>> intervalToShardSpecs = new HashMap<>();
+    final int defaultNumShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
+    for (final Map.Entry<Interval, Optional<HyperLogLogCollector>> entry : hllCollectors.entrySet()) {
+      final Interval interval = entry.getKey();
+      final Optional<HyperLogLogCollector> collector = entry.getValue();
+
+      final int numShards;
+      if (determineNumPartitions) {
+        final long numRows = new Double(collector.get().estimateCardinality()).longValue();
+        numShards = (int) Math.ceil((double) numRows / tuningConfig.getTargetPartitionSize());
+        log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards);
+      } else {
+        numShards = defaultNumShards;
+        log.info("Creating [%,d] shards for interval [%s]", numShards, interval);
       }
 
-      return shardSpecs;
+      final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
+          useExtendableShardSpec,
+          numShards,
+          jsonMapper
+      );
+
+      final List<ShardSpec> intervalShardSpecs = IntStream.range(0, numShards)
+                                                          .mapToObj(
+                                                              shardId -> shardSpecCreateFn.apply(shardId, numShards)
+                                                          ).collect(Collectors.toList());
+
+      intervalToShardSpecs.put(interval, intervalShardSpecs);
+    }
+    log.info("Found intervals and shardSpecs in %,dms", System.currentTimeMillis() - determineShardSpecsStartMillis);
+
+    if (useExtendableShardSpec) {
+      return createExtendableShardSpecs(intervalToShardSpecs);
+    } else {
+      return createNonExtendableShardSpecs(intervalToShardSpecs);
     }
+  }
 
-    // determine intervals containing data and prime HLL collectors
-    final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = Maps.newHashMap();
+  private static Map<Interval, Optional<HyperLogLogCollector>> collectIntervalsAndShardSpecs(
+      ObjectMapper jsonMapper,
+      IndexIngestionSpec ingestionSchema,
+      FirehoseFactory firehoseFactory,
+      File firehoseTempDir,
+      GranularitySpec granularitySpec,
+      boolean determineIntervals,
+      boolean determineNumPartitions
+  ) throws IOException
+  {
+    final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = new TreeMap<>(
+        Comparators.intervalsByStartThenEnd()
+    );
     int thrownAway = 0;
 
 Review comment:
   This variable is never updated. The if below conditioning of this variable to be positive is dead code. See #7737.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org