You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/20 17:56:03 UTC
[1/2] incubator-beam git commit: Use the ParDo Application to Cache
DoFns
Repository: incubator-beam
Updated Branches:
refs/heads/master f547f70e1 -> 436e4a34e
Use the ParDo Application to Cache DoFns
A DoFn application is the scope of reuse.
Factor CloningThreadLocal as the top-level class instead of
SerializableCloningThreadLocalCacheLoader, and extract the Fn from the
AppliedPTransform when loading an absent element.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/00195d25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/00195d25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/00195d25
Branch: refs/heads/master
Commit: 00195d2543eb347cc3669a4ac89e98da0bc4dca4
Parents: f0119b2
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 28 15:44:49 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Jul 20 10:55:32 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/CloningThreadLocal.java | 43 +++++++++
.../direct/ParDoMultiEvaluatorFactory.java | 44 ++++++---
.../direct/ParDoSingleEvaluatorFactory.java | 50 ++++++----
...rializableCloningThreadLocalCacheLoader.java | 54 -----------
.../runners/direct/CloningThreadLocalTest.java | 92 ++++++++++++++++++
...izableCloningThreadLocalCacheLoaderTest.java | 99 --------------------
6 files changed, 198 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningThreadLocal.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningThreadLocal.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningThreadLocal.java
new file mode 100644
index 0000000..b9dc4ca
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningThreadLocal.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.util.SerializableUtils;
+
+import java.io.Serializable;
+
+/**
+ * A {@link ThreadLocal} that obtains the initial value by cloning an original value.
+ */
+class CloningThreadLocal<T extends Serializable> extends ThreadLocal<T> {
+ public static <T extends Serializable> CloningThreadLocal<T> of(T original) {
+ return new CloningThreadLocal<>(original);
+ }
+
+ private final T original;
+
+ private CloningThreadLocal(T original) {
+ this.original = original;
+ }
+
+ @Override
+ public T initialValue() {
+ return SerializableUtils.clone(original);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index e008bdc..b87cd3e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.Map;
@@ -36,11 +37,24 @@ import java.util.Map;
* {@link BoundMulti} primitive {@link PTransform}.
*/
class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
- private final LoadingCache<DoFn<?, ?>, ThreadLocal<DoFn<?, ?>>> fnClones;
+ private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<DoFn<?, ?>>>
+ fnClones;
public ParDoMultiEvaluatorFactory() {
- fnClones = CacheBuilder.newBuilder()
- .build(SerializableCloningThreadLocalCacheLoader.<DoFn<?, ?>>create());
+ fnClones =
+ CacheBuilder.newBuilder()
+ .build(
+ new CacheLoader<
+ AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<DoFn<?, ?>>>() {
+ @Override
+ public ThreadLocal<DoFn<?, ?>> load(AppliedPTransform<?, ?, BoundMulti<?, ?>> key)
+ throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ ThreadLocal threadLocal =
+ (ThreadLocal) CloningThreadLocal.of(key.getTransform().getFn());
+ return threadLocal;
+ }
+ });
}
@Override
@@ -59,19 +73,21 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
CommittedBundle<InT> inputBundle,
EvaluationContext evaluationContext) {
Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
- DoFn<InT, OuT> fn = application.getTransform().getFn();
- @SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal<DoFn<InT, OuT>> fnLocal =
- (ThreadLocal) fnClones.getUnchecked(application.getTransform().getFn());
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ ThreadLocal<DoFn<InT, OuT>> fnLocal =
+ (ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application);
try {
- TransformEvaluator<InT> parDoEvaluator = ParDoEvaluator.create(evaluationContext,
- inputBundle,
- application,
- fnLocal.get(),
- application.getTransform().getSideInputs(),
- application.getTransform().getMainOutputTag(),
- application.getTransform().getSideOutputTags().getAll(),
- outputs);
+ TransformEvaluator<InT> parDoEvaluator =
+ ParDoEvaluator.create(
+ evaluationContext,
+ inputBundle,
+ application,
+ fnLocal.get(),
+ application.getTransform().getSideInputs(),
+ application.getTransform().getMainOutputTag(),
+ application.getTransform().getSideOutputTags().getAll(),
+ outputs);
return ThreadLocalInvalidatingTransformEvaluator.wrapping(parDoEvaluator, fnLocal);
} catch (Exception e) {
fnLocal.remove();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 0f7fc83..e9c7dd6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
@@ -36,11 +37,23 @@ import java.util.Collections;
* {@link Bound ParDo.Bound} primitive {@link PTransform}.
*/
class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
- private final LoadingCache<DoFn<?, ?>, ThreadLocal<DoFn<?, ?>>> fnClones;
+ private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<DoFn<?, ?>>>
+ fnClones;
public ParDoSingleEvaluatorFactory() {
- fnClones = CacheBuilder.newBuilder()
- .build(SerializableCloningThreadLocalCacheLoader.<DoFn<?, ?>>create());
+ fnClones =
+ CacheBuilder.newBuilder()
+ .build(
+ new CacheLoader<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<DoFn<?, ?>>>() {
+ @Override
+ public ThreadLocal<DoFn<?, ?>> load(AppliedPTransform<?, ?, Bound<?, ?>> key)
+ throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ ThreadLocal threadLocal =
+ (ThreadLocal) CloningThreadLocal.of(key.getTransform().getFn());
+ return threadLocal;
+ }
+ });
}
@Override
@@ -55,23 +68,26 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
}
private <InputT, OutputT> TransformEvaluator<InputT> createSingleEvaluator(
- @SuppressWarnings("rawtypes") AppliedPTransform<PCollection<InputT>, PCollection<OutputT>,
- Bound<InputT, OutputT>> application,
- CommittedBundle<InputT> inputBundle, EvaluationContext evaluationContext) {
+ AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, Bound<InputT, OutputT>>
+ application,
+ CommittedBundle<InputT> inputBundle,
+ EvaluationContext evaluationContext) {
TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
- @SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal<DoFn<InputT, OutputT>> fnLocal =
- (ThreadLocal) fnClones.getUnchecked(application.getTransform().getFn());
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ ThreadLocal<DoFn<InputT, OutputT>> fnLocal =
+ (ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application);
try {
- ParDoEvaluator<InputT> parDoEvaluator = ParDoEvaluator.create(
- evaluationContext,
- inputBundle,
- application,
- fnLocal.get(),
- application.getTransform().getSideInputs(),
- mainOutputTag,
- Collections.<TupleTag<?>>emptyList(),
- ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
+ ParDoEvaluator<InputT> parDoEvaluator =
+ ParDoEvaluator.create(
+ evaluationContext,
+ inputBundle,
+ application,
+ fnLocal.get(),
+ application.getTransform().getSideInputs(),
+ mainOutputTag,
+ Collections.<TupleTag<?>>emptyList(),
+ ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
return ThreadLocalInvalidatingTransformEvaluator.wrapping(parDoEvaluator, fnLocal);
} catch (Exception e) {
fnLocal.remove();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoader.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoader.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoader.java
deleted file mode 100644
index a3703d9..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoader.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.util.SerializableUtils;
-
-import com.google.common.cache.CacheLoader;
-
-import java.io.Serializable;
-
-/**
- * A {@link CacheLoader} that loads {@link ThreadLocal ThreadLocals} with initial values equal to
- * the clone of the key.
- */
-class SerializableCloningThreadLocalCacheLoader<T extends Serializable>
- extends CacheLoader<T, ThreadLocal<T>> {
- public static <T extends Serializable> CacheLoader<T, ThreadLocal<T>> create() {
- return new SerializableCloningThreadLocalCacheLoader<T>();
- }
-
- @Override
- public ThreadLocal<T> load(T key) throws Exception {
- return new CloningThreadLocal<>(key);
- }
-
- private static class CloningThreadLocal<T extends Serializable> extends ThreadLocal<T> {
- private final T original;
-
- public CloningThreadLocal(T value) {
- this.original = value;
- }
-
- @Override
- public T initialValue() {
- return SerializableUtils.clone(original);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningThreadLocalTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningThreadLocalTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningThreadLocalTest.java
new file mode 100644
index 0000000..298db46
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningThreadLocalTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.core.IsNot.not;
+import static org.hamcrest.core.IsSame.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link CloningThreadLocalTest}.
+ */
+@RunWith(JUnit4.class)
+public class CloningThreadLocalTest {
+ @Test
+ public void returnsCopiesOfOriginal() throws Exception {
+ Record original = new Record();
+ ThreadLocal<Record> loaded = CloningThreadLocal.of(original);
+ assertThat(loaded.get(), not(nullValue()));
+ assertThat(loaded.get(), equalTo(original));
+ assertThat(loaded.get(), not(theInstance(original)));
+ }
+
+ @Test
+ public void returnsDifferentCopiesInDifferentThreads() throws Exception {
+ Record original = new Record();
+ final ThreadLocal<Record> loaded = CloningThreadLocal.of(original);
+ assertThat(loaded.get(), not(nullValue()));
+ assertThat(loaded.get(), equalTo(original));
+ assertThat(loaded.get(), not(theInstance(original)));
+
+ Callable<Record> otherThread =
+ new Callable<Record>() {
+ @Override
+ public Record call() throws Exception {
+ return loaded.get();
+ }
+ };
+ Record sameThread = loaded.get();
+ Record firstOtherThread = Executors.newSingleThreadExecutor().submit(otherThread).get();
+ Record secondOtherThread = Executors.newSingleThreadExecutor().submit(otherThread).get();
+
+ assertThat(sameThread, equalTo(firstOtherThread));
+ assertThat(sameThread, equalTo(secondOtherThread));
+ assertThat(sameThread, not(theInstance(firstOtherThread)));
+ assertThat(sameThread, not(theInstance(secondOtherThread)));
+ assertThat(firstOtherThread, not(theInstance(secondOtherThread)));
+ }
+
+ private static class Record implements Serializable {
+ private final double rand = Math.random();
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Record)) {
+ return false;
+ }
+ Record that = (Record) other;
+ return this.rand == that.rand;
+ }
+
+ @Override
+ public int hashCode() {
+ return 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoaderTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoaderTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoaderTest.java
deleted file mode 100644
index c451eec..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoaderTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.core.IsNot.not;
-import static org.hamcrest.core.IsSame.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-
-/**
- * Tests for {@link SerializableCloningThreadLocalCacheLoader}.
- */
-@RunWith(JUnit4.class)
-public class SerializableCloningThreadLocalCacheLoaderTest {
- private SerializableCloningThreadLocalCacheLoader<Record> loader;
-
- @Before
- public void setup() {
- loader = new SerializableCloningThreadLocalCacheLoader();
- }
-
- @Test
- public void returnsCopiesOfOriginal() throws Exception {
- Record original = new Record();
- ThreadLocal<Record> loaded = loader.load(original);
- assertThat(loaded.get(), not(nullValue()));
- assertThat(loaded.get(), equalTo(original));
- assertThat(loaded.get(), not(theInstance(original)));
- }
-
- @Test
- public void returnsDifferentCopiesInDifferentThreads() throws Exception {
- Record original = new Record();
- final ThreadLocal<Record> loaded = loader.load(original);
- assertThat(loaded.get(), not(nullValue()));
- assertThat(loaded.get(), equalTo(original));
- assertThat(loaded.get(), not(theInstance(original)));
-
- Callable<Record> otherThread = new Callable<Record>() {
- @Override
- public Record call() throws Exception {
- return loaded.get();
- }
- };
- Record sameThread = loaded.get();
- Record firstOtherThread = Executors.newSingleThreadExecutor().submit(otherThread).get();
- Record secondOtherThread = Executors.newSingleThreadExecutor().submit(otherThread).get();
-
- assertThat(sameThread, equalTo(firstOtherThread));
- assertThat(sameThread, equalTo(secondOtherThread));
- assertThat(sameThread, not(theInstance(firstOtherThread)));
- assertThat(sameThread, not(theInstance(secondOtherThread)));
- assertThat(firstOtherThread, not(theInstance(secondOtherThread)));
- }
-
- private static class Record implements Serializable {
- private final double rand = Math.random();
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof Record)) {
- return false;
- }
- Record that = (Record) other;
- return this.rand == that.rand;
- }
-
- @Override
- public int hashCode() {
- return 1;
- }
- }
-}
[2/2] incubator-beam git commit: This closes #554
Posted by ke...@apache.org.
This closes #554
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/436e4a34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/436e4a34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/436e4a34
Branch: refs/heads/master
Commit: 436e4a34ebb222545cb03cb6d39ea4ca2d905254
Parents: f547f70 00195d2
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jul 20 10:55:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Jul 20 10:55:53 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/CloningThreadLocal.java | 43 +++++++++
.../direct/ParDoMultiEvaluatorFactory.java | 44 ++++++---
.../direct/ParDoSingleEvaluatorFactory.java | 50 ++++++----
...rializableCloningThreadLocalCacheLoader.java | 54 -----------
.../runners/direct/CloningThreadLocalTest.java | 92 ++++++++++++++++++
...izableCloningThreadLocalCacheLoaderTest.java | 99 --------------------
6 files changed, 198 insertions(+), 184 deletions(-)
----------------------------------------------------------------------