You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/25 19:11:47 UTC

[3/4] incubator-beam git commit: Tidy WriteWithShardingFactory

Tidy WriteWithShardingFactory


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20244bad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20244bad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20244bad

Branch: refs/heads/master
Commit: 20244badc3d1b8defd9e5b9a718f54475c502365
Parents: cf14644
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jul 19 19:16:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 25 09:30:32 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/WriteWithShardingFactory.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20244bad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 93f2408..d6ee6ea 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -78,7 +78,8 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
           Window.<T>into(new GlobalWindows()).triggering(DefaultTrigger.of())
               .withAllowedLateness(Duration.ZERO)
               .discardingFiredPanes());
-      final PCollectionView<Long> numRecords = records.apply(Count.<T>globally().asSingletonView());
+      final PCollectionView<Long> numRecords = records
+          .apply("CountRecords", Count.<T>globally().asSingletonView());
       PCollection<T> resharded =
           records
               .apply(
@@ -107,7 +108,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
     private final PCollectionView<Long> numRecords;
     private final int randomExtraShards;
     private int currentShard;
-    private int maxShards;
+    private int maxShards = 0;
 
     KeyBasedOnCountFn(PCollectionView<Long> numRecords, int extraShards) {
       this.numRecords = numRecords;
@@ -116,7 +117,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
 
     @Override
     public void processElement(ProcessContext c) throws Exception {
-      if (maxShards == 0L) {
+      if (maxShards == 0) {
         maxShards = calculateShards(c.sideInput(numRecords));
         currentShard = ThreadLocalRandom.current().nextInt(maxShards);
       }