You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/25 19:11:47 UTC
[3/4] incubator-beam git commit: Tidy WriteWithShardingFactory
Tidy WriteWithShardingFactory
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20244bad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20244bad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20244bad
Branch: refs/heads/master
Commit: 20244badc3d1b8defd9e5b9a718f54475c502365
Parents: cf14644
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jul 19 19:16:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 25 09:30:32 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/WriteWithShardingFactory.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20244bad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 93f2408..d6ee6ea 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -78,7 +78,8 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
Window.<T>into(new GlobalWindows()).triggering(DefaultTrigger.of())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
- final PCollectionView<Long> numRecords = records.apply(Count.<T>globally().asSingletonView());
+ final PCollectionView<Long> numRecords = records
+ .apply("CountRecords", Count.<T>globally().asSingletonView());
PCollection<T> resharded =
records
.apply(
@@ -107,7 +108,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
private final PCollectionView<Long> numRecords;
private final int randomExtraShards;
private int currentShard;
- private int maxShards;
+ private int maxShards = 0;
KeyBasedOnCountFn(PCollectionView<Long> numRecords, int extraShards) {
this.numRecords = numRecords;
@@ -116,7 +117,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
@Override
public void processElement(ProcessContext c) throws Exception {
- if (maxShards == 0L) {
+ if (maxShards == 0) {
maxShards = calculateShards(c.sideInput(numRecords));
currentShard = ThreadLocalRandom.current().nextInt(maxShards);
}