You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/28 14:47:42 UTC

[08/50] incubator-beam git commit: Remove OldDoFn to DoFn in DirectRunner

Remove OldDoFn to DoFn in DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/28720191
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/28720191
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/28720191

Branch: refs/heads/apex-runner
Commit: 287201916776cb51b98b9ddd27c169f87bb89e1a
Parents: 3d08685
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 25 11:18:10 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 25 13:12:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/WriteWithShardingFactory.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/28720191/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index d74cd56..8727cb5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -25,9 +25,9 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.io.Write.Bound;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
@@ -102,7 +102,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
   }
 
   @VisibleForTesting
-  static class KeyBasedOnCountFn<T> extends OldDoFn<T, KV<Integer, T>> {
+  static class KeyBasedOnCountFn<T> extends DoFn<T, KV<Integer, T>> {
     @VisibleForTesting
     static final int MIN_SHARDS_FOR_LOG = 3;
 
@@ -116,7 +116,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
       this.randomExtraShards = extraShards;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       if (maxShards == 0) {
         maxShards = calculateShards(c.sideInput(numRecords));