You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/16 16:13:47 UTC
[beam] branch master updated: [BEAM-5674] Add withKeyType to
DataflowRunner.Deduplicate internals to provide a coder (#7054)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d0b9fbc [BEAM-5674] Add withKeyType to DataflowRunner.Deduplicate internals to provide a coder (#7054)
d0b9fbc is described below
commit d0b9fbc898753a9d3bd44fc84d38880bd83a32a7
Author: Sam sam <ro...@gmail.com>
AuthorDate: Fri Nov 16 08:13:40 2018 -0800
[BEAM-5674] Add withKeyType to DataflowRunner.Deduplicate internals to provide a coder (#7054)
---
.../main/java/org/apache/beam/runners/dataflow/DataflowRunner.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 680c348..9eb0cfc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -153,6 +153,7 @@ import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.DateTimeUtils;
@@ -1494,7 +1495,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
return input
- .apply(WithKeys.of(value -> Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS))
+ .apply(
+ WithKeys.of(
+ (ValueWithRecordId<T> value) ->
+ Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS)
+ .withKeyType(TypeDescriptors.integers()))
// Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through
// WindmillSink.
.apply(Reshuffle.of())