You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "hqx871 (via GitHub)" <gi...@apache.org> on 2023/02/23 14:05:18 UTC

[GitHub] [druid] hqx871 opened a new pull request, #13840: add sampling factor for DeterminePartitionsJob

hqx871 opened a new pull request, #13840:
URL: https://github.com/apache/druid/pull/13840

   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   Let's first recall that there are two type of DeterminePartitionsJob.
   -  When the input data is not assume grouped, there may be duplicate rows. We will launch two mr job. The first one do group job to remove duplicate rows. Then the second one do global sorting to find lower and upper bound for target segments.
   - When the input data is assume grouped, we only need to launch the global sorting mr job to find lower and upper bound for segments.
   
   As we don't need a segment size which exactly equals the targetRowsPerSegment parameter, we can sample the input data to improve the DeterminePartitionsJob. Let me explain how to use the sampling factor to improve  the DeterminePartitionsJob.
   - If the input data is assume grouped, we just do sample by random at the mapper side.
   - If the input data is not assume grouped, we do sample at the mapper of the group job. We use hash on time and all dimensions and mod by sampling factor to do sample, we don't use random method because there may be duplicate rows.
   
   For example, if we ingest 10,000,000,000 rows and the targetRowsPerSegment is 5,000,000, we can sample by 500, so the DeterminePartitionsJob only need process 20,000,000 rows, this helps save a lot of time.
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <!-- Give your best effort to summarize your changes in a couple of sentences aimed toward Druid users. 
   
   If your change doesn't have end user impact, you can skip this section.
   
   For tips about how to write a good release note, see [Release notes](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#release-notes).
   
   -->
   
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `DeterminePartitionsJobSampler`
    * `DeterminePartitionsJob`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1186981284


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   The DeterminePartitionsGroupByReducer does de-duplication. So it's ok to emit duplicates in mapper as before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1188716167


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   yes l, you are right. we can use hash and mod for the assume grouped case. but i use random for convenience. and may be it is faster than hash if random method  is ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1184541081


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   Could you please elaborate on how this helps tackle duplicates? 
   
   IIUC, this would only emit group keys whose hash modulo comes out to be 0. The other group keys would never be emitted. For a group key that would be emitted, you would still emit duplicates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on PR #13840:
URL: https://github.com/apache/druid/pull/13840#issuecomment-1449656454

   Thanks for the changes, @hqx871 , I will try to get this reviewed soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] add sampling factor for DeterminePartitionsJob (druid)

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz merged PR #13840:
URL: https://github.com/apache/druid/pull/13840


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1190648000


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   Using hash based sample before de-duplication job should make the de-duplication job faster and also ensure it's result not biased.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1190648000


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   Using hash based sample before de-duplication job should make the de-duplication job faster and ensure it's result not biased.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1187003374


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java:
##########
@@ -130,7 +142,8 @@ public HadoopTuningConfig(
       final @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
       final @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
       final @JsonProperty("useYarnRMJobStatusFallback") @Nullable Boolean useYarnRMJobStatusFallback,
-      final @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis
+      final @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis,
+      final @JsonProperty("samplingFactor") int samplingFactor

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1184541081


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   Could you please elaborate on how this helps tackle duplicates? 
   
   IIUC, this would only emit group keys whose hash module comes out to be 0. The other group keys would never be emitted. For a group key that would be emitted, you would still emit duplicates.



##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java:
##########
@@ -130,7 +142,8 @@ public HadoopTuningConfig(
       final @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
       final @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
       final @JsonProperty("useYarnRMJobStatusFallback") @Nullable Boolean useYarnRMJobStatusFallback,
-      final @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis
+      final @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis,
+      final @JsonProperty("samplingFactor") int samplingFactor

Review Comment:
   Maybe rename this config to `determinePartitionsSamplingFactor` to avoid ambiguity.



##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   For de-duplication, you can look at `DedupInputRowFilter` in `PartialDimensionDistributionTask`.



##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java:
##########
@@ -130,7 +142,8 @@ public HadoopTuningConfig(
       final @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
       final @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
       final @JsonProperty("useYarnRMJobStatusFallback") @Nullable Boolean useYarnRMJobStatusFallback,
-      final @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis
+      final @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis,
+      final @JsonProperty("samplingFactor") int samplingFactor

Review Comment:
   Also, I think the property should be nullable, if not specified, it would default to 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1190644281


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   If the input rows is not assumeGrouped, the DeterminePartititonsJob needs to do de-duplication MR job before launch the Sorting MR job.  So we should make sure the rows after de-duplication is not biased if sampled, not the rows before de-duplication job. for this case, the hash based method will not make biased, but the sampling will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1190644281


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   If the input rows is not assumeGrouped, the DeterminePartititonsJob needs to do distinct MR job before launch the Sorting MR job.  So we should consider the rows after distinct, for this case, the hash based method will not make biased, but the sampling will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] add sampling factor for DeterminePartitionsJob (druid)

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on PR #13840:
URL: https://github.com/apache/druid/pull/13840#issuecomment-1674218285

   @hqx871 , I am merging this PR for now as it is functionally correct. We can do further improvements as needed with some benchmarking. Thanks for the contribution and for your patience!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1184559655


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java:
##########
@@ -130,7 +142,8 @@ public HadoopTuningConfig(
       final @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
       final @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
       final @JsonProperty("useYarnRMJobStatusFallback") @Nullable Boolean useYarnRMJobStatusFallback,
-      final @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis
+      final @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis,
+      final @JsonProperty("samplingFactor") int samplingFactor

Review Comment:
   Also, I think the property should be nullable. If not specified, it would default to 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1190644281


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   PartialDimensionDistributionTask uses sketch to get quantiles for shards, it is a different from DeterminePartititonsJob, which use MR to do distinct and sort to find the upper and lower bounds for shards. Am I right?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1190644281


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   If the input rows is not assumeGrouped, the DeterminePartititonsJob needs to do distinct MR job before launch the Sorting MR job.  So we should make sure the rows after distinct is not biased if sampled, not the rows before distinct job. for this case, the hash based method will not make biased, but the sampling will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on PR #13840:
URL: https://github.com/apache/druid/pull/13840#issuecomment-1546759933

   Just considering the PartialDimensionDistributionTask, if we apply sampling method before the DedupInputRowFilter, we should use hashed based sampling method to avoid bias.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on PR #13840:
URL: https://github.com/apache/druid/pull/13840#issuecomment-1546756597

   After reading the PartialDimensionDistributionTask more, I find it use bloom filter to do de-duplication.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on PR #13840:
URL: https://github.com/apache/druid/pull/13840#issuecomment-1543319247

   @hqx871 , let me know if you have given some thought to this.
   
   > 
   > Ideally, for determining partitions, I would have liked to do what we do with `PartialDimensionDistributionTask` in native batch using `StringSketch` and `StringSketchMerger`. It would probably go something like this:
   > 
   >     * Each mapper emits a `StringDistribution` for each `Interval` that it encounters.
   > 
   >     * The reducer merges the distributions received for each `Interval` using `StringSketchMerger`.
   > 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1190621827


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   No, I meant to say that random is better for both assumeGrouped true and false.
   Sampling based on the hash value of the keys is going to be inherently biased.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1188716167


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   yes, you are right. we can use hash and mod for the assume grouped case. but i use random for convenience. and may be it is faster than hash if random method  is ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13840:
URL: https://github.com/apache/druid/pull/13840#discussion_r1187386164


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJobSampler.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DeterminePartitionsJobSampler
+{
+  private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
+
+  private final int samplingFactor;
+
+  private final int sampledTargetPartitionSize;
+
+  private final int sampledMaxRowsPerSegment;
+
+  public DeterminePartitionsJobSampler(int samplingFactor, int targetPartitionSize, int maxRowsPerSegment)
+  {
+    this.samplingFactor = Math.max(samplingFactor, 1);
+    this.sampledTargetPartitionSize = targetPartitionSize / this.samplingFactor;
+    this.sampledMaxRowsPerSegment = maxRowsPerSegment / this.samplingFactor;
+  }
+
+  /**
+   * If input rows is duplicate, we can use hash and mod to do sample. As we hash on whole group key,
+   * there will not likely data skew if the hash function is balanced enough.
+   */
+  boolean shouldEmitRow(byte[] groupKeyBytes)
+  {
+    return samplingFactor == 1 || HASH_FUNCTION.hashBytes(groupKeyBytes).asInt() % samplingFactor == 0;

Review Comment:
   In that case, why do we need separate sampling methods based on the values of assumeGrouped? In other words, why do we need to use the hash of the group key?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on PR #13840:
URL: https://github.com/apache/druid/pull/13840#issuecomment-1543350313

   PartialDimensionDistributionTask uses sketch to get quantiles for shards, it is a different from DeterminePartititonsJob, which use MR to do distinct and sort to find the upper and lower bounds for shards. Am I right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] hqx871 commented on pull request #13840: add sampling factor for DeterminePartitionsJob

Posted by "hqx871 (via GitHub)" <gi...@apache.org>.
hqx871 commented on PR #13840:
URL: https://github.com/apache/druid/pull/13840#issuecomment-1543359087

   I have not read much about the quantile sketch method. but it seams to do de-duplication itself and do not need to launch another de-duplication job.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org