You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/16 21:22:59 UTC
[1/4] incubator-beam git commit: This closes #1319
Repository: incubator-beam
Updated Branches:
refs/heads/master c695ef48b -> 15e93c58e
This closes #1319
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15e93c58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15e93c58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15e93c58
Branch: refs/heads/master
Commit: 15e93c58e933ef913c89a9654ceb567299a2fe5b
Parents: c695ef4 bf4c504
Author: Thomas Groh <tg...@google.com>
Authored: Wed Nov 16 13:22:41 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 13:22:41 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 3 +-
.../beam/runners/direct/TransformExecutor.java | 23 -----------
.../runners/direct/TransformExecutorTest.java | 43 --------------------
.../apache/beam/sdk/util/TimerInternals.java | 13 ++++--
4 files changed, 11 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-beam git commit: Reduce Visibility of
PCollectionViewWriter
Posted by tg...@apache.org.
Reduce Visibility of PCollectionViewWriter
This is an internal implementation detail of the DirectRunner and should
not be exposed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf4c504d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf4c504d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf4c504d
Branch: refs/heads/master
Commit: bf4c504dffa05c8eccc5c2ae16dffdd53418a468
Parents: 5ba4d18
Author: Thomas Groh <tg...@google.com>
Authored: Wed Nov 16 10:37:42 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 13:22:41 2016 -0800
----------------------------------------------------------------------
.../main/java/org/apache/beam/runners/direct/DirectRunner.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf4c504d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index c9a7864..04c8eb6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -183,10 +183,11 @@ public class DirectRunner
/**
* A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
* a storage mechanism that can be read from while constructing a {@link PCollectionView}.
+ *
* @param <ElemT> the type of elements the input {@link PCollection} contains.
* @param <ViewT> the type of the PCollectionView this writer writes to.
*/
- public interface PCollectionViewWriter<ElemT, ViewT> {
+ interface PCollectionViewWriter<ElemT, ViewT> {
void add(Iterable<WindowedValue<ElemT>> values);
}
[2/4] incubator-beam git commit: Remove unused Thread variable in
TransformExecutor
Posted by tg...@apache.org.
Remove unused Thread variable in TransformExecutor
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ba4d181
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ba4d181
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ba4d181
Branch: refs/heads/master
Commit: 5ba4d181d4625d43078bb0b071635d563d925277
Parents: 3e6a4f4
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 8 14:16:23 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 13:22:41 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/TransformExecutor.java | 23 -----------
.../runners/direct/TransformExecutorTest.java | 43 --------------------
2 files changed, 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ba4d181/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index c4002b5..1704955 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -17,13 +17,9 @@
*/
package org.apache.beam.runners.direct;
-import static com.google.common.base.Preconditions.checkState;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.metrics.MetricUpdates;
import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -70,8 +66,6 @@ class TransformExecutor<T> implements Runnable {
private final TransformExecutorService transformEvaluationState;
private final EvaluationContext context;
- private final AtomicReference<Thread> thread;
-
private TransformExecutor(
EvaluationContext context,
TransformEvaluatorFactory factory,
@@ -90,20 +84,12 @@ class TransformExecutor<T> implements Runnable {
this.transformEvaluationState = transformEvaluationState;
this.context = context;
- this.thread = new AtomicReference<>();
}
@Override
public void run() {
MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName());
MetricsEnvironment.setMetricsContainer(metricsContainer);
- checkState(
- thread.compareAndSet(null, Thread.currentThread()),
- "Tried to execute %s for %s on thread %s, but is already executing on thread %s",
- TransformExecutor.class.getSimpleName(),
- transform.getFullName(),
- Thread.currentThread(),
- thread.get());
try {
Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
@@ -186,13 +172,4 @@ class TransformExecutor<T> implements Runnable {
}
return result;
}
-
- /**
- * If this {@link TransformExecutor} is currently executing, return the thread it is executing in.
- * Otherwise, return null.
- */
- @Nullable
- public Thread getThread() {
- return thread.get();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ba4d181/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 32f874d..0b7b882 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
-import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
@@ -277,48 +276,6 @@ public class TransformExecutorTest {
}
@Test
- public void duringCallGetThreadIsNonNull() throws Exception {
- final TransformResult result =
- StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
- final CountDownLatch testLatch = new CountDownLatch(1);
- final CountDownLatch evaluatorLatch = new CountDownLatch(1);
- TransformEvaluator<Object> evaluator =
- new TransformEvaluator<Object>() {
- @Override
- public void processElement(WindowedValue<Object> element) throws Exception {
- throw new IllegalArgumentException("Shouldn't be called");
- }
-
- @Override
- public TransformResult finishBundle() throws Exception {
- testLatch.countDown();
- evaluatorLatch.await();
- return result;
- }
- };
-
- when(registry.forApplication(created.getProducingTransformInternal(), null))
- .thenReturn(evaluator);
-
- TransformExecutor<String> executor =
- TransformExecutor.create(
- evaluationContext,
- registry,
- Collections.<ModelEnforcementFactory>emptyList(),
- null,
- created.getProducingTransformInternal(),
- completionCallback,
- transformEvaluationState);
-
- Executors.newSingleThreadExecutor().submit(executor);
- testLatch.await();
- assertThat(executor.getThread(), not(nullValue()));
-
- // Finish the execution so everything can get closed down cleanly.
- evaluatorLatch.countDown();
- }
-
- @Test
public void callWithEnforcementAppliesEnforcement() throws Exception {
final TransformResult result =
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
[4/4] incubator-beam git commit: Reduce incidence of Namespace
StringKey comparisons
Posted by tg...@apache.org.
Reduce incidence of Namespace StringKey comparisons
If the Namespace of a TimerData reports itself as being equal to the
other namespace, immediately return 0 rather than generating the string
keys and comparing them.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3e6a4f4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3e6a4f4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3e6a4f4a
Branch: refs/heads/master
Commit: 3e6a4f4a49344871430d2711e934b0493c17499f
Parents: c695ef4
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 8 14:18:58 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 13:22:41 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/TimerInternals.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e6a4f4a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index 743f3f7..5d4a72d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -230,12 +230,17 @@ public interface TimerInternals {
* arbitrary.
*/
@Override
- public int compareTo(TimerData o) {
+ public int compareTo(TimerData that) {
+ if (this.equals(that)) {
+ return 0;
+ }
ComparisonChain chain =
- ComparisonChain.start().compare(timestamp, o.getTimestamp()).compare(domain, o.domain);
- if (chain.result() == 0) {
+ ComparisonChain.start()
+ .compare(this.timestamp, that.getTimestamp())
+ .compare(this.domain, that.domain);
+ if (chain.result() == 0 && !this.namespace.equals(that.namespace)) {
// Obtaining the stringKey may be expensive; only do so if required
- chain = chain.compare(namespace.stringKey(), o.namespace.stringKey());
+ chain = chain.compare(namespace.stringKey(), that.namespace.stringKey());
}
return chain.result();
}