You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/16 16:13:47 UTC

[beam] branch master updated: [BEAM-5674] Add withKeyType to DataflowRunner.Deduplicate internals to provide a coder (#7054)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d0b9fbc  [BEAM-5674] Add withKeyType to DataflowRunner.Deduplicate internals to provide a coder (#7054)
d0b9fbc is described below

commit d0b9fbc898753a9d3bd44fc84d38880bd83a32a7
Author: Sam sam <ro...@gmail.com>
AuthorDate: Fri Nov 16 08:13:40 2018 -0800

    [BEAM-5674] Add withKeyType to DataflowRunner.Deduplicate internals to provide a coder (#7054)
---
 .../main/java/org/apache/beam/runners/dataflow/DataflowRunner.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 680c348..9eb0cfc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -153,6 +153,7 @@ import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.DateTimeUtils;
@@ -1494,7 +1495,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     @Override
     public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
       return input
-          .apply(WithKeys.of(value -> Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS))
+          .apply(
+              WithKeys.of(
+                      (ValueWithRecordId<T> value) ->
+                          Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS)
+                  .withKeyType(TypeDescriptors.integers()))
           // Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through
           // WindmillSink.
           .apply(Reshuffle.of())