You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/27 21:00:55 UTC
[1/2] incubator-beam git commit: Create DoFnInvoker instances in the
package of the invoked DoFn
Repository: incubator-beam
Updated Branches:
refs/heads/master 0866e4906 -> 2cbac53c5
Create DoFnInvoker instances in the package of the invoked DoFn
The DoFnInvoker implementation for a DoFn carries a field
with the type of the invoked DoFn, and that type needs to be
visible.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/681ad896
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/681ad896
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/681ad896
Branch: refs/heads/master
Commit: 681ad8968ef68a309e118ae0d87635535ec26b3f
Parents: e8695a1
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jul 26 21:40:02 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Jul 27 14:00:05 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/DoFnReflector.java | 20 +++-
.../beam/sdk/transforms/DoFnReflectorTest.java | 75 +++++++++++-
.../dofnreflector/DoFnReflectorTestHelper.java | 116 +++++++++++++++++++
3 files changed, 203 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/681ad896/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index 116b64d..0616eff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -42,16 +42,18 @@ import com.google.common.reflect.TypeParameter;
import com.google.common.reflect.TypeToken;
import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.NamingStrategy.SuffixingRandom;
import net.bytebuddy.description.field.FieldDescription;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.method.ParameterList;
import net.bytebuddy.description.modifier.FieldManifestation;
import net.bytebuddy.description.modifier.Visibility;
import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.description.type.TypeDescription.Generic;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.dynamic.scaffold.InstrumentedType;
-import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy.Default;
import net.bytebuddy.implementation.Implementation;
import net.bytebuddy.implementation.MethodCall.MethodLocator;
import net.bytebuddy.implementation.StubMethod;
@@ -70,7 +72,6 @@ import net.bytebuddy.jar.asm.Label;
import net.bytebuddy.jar.asm.MethodVisitor;
import net.bytebuddy.jar.asm.Opcodes;
import net.bytebuddy.matcher.ElementMatchers;
-
import org.joda.time.Instant;
import java.io.IOException;
@@ -89,7 +90,6 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-
import javax.annotation.Nullable;
@@ -499,8 +499,20 @@ public abstract class DoFnReflector {
*/
private Constructor<? extends DoFnInvoker<?, ?>> createInvokerConstructor(
@SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> clazz) {
+
+ final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(clazz);
+
DynamicType.Builder<?> builder = new ByteBuddy()
- .subclass(DoFnInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+ // Create subclasses inside the target class, to have access to
+ // private and package-private bits
+ .with(new SuffixingRandom("auxiliary") {
+ @Override
+ public String subclass(Generic superClass) {
+ return super.name(clazzDescription);
+ }
+ })
+ // Create a subclass of DoFnInvoker
+ .subclass(DoFnInvoker.class, Default.NO_CONSTRUCTORS)
.defineField(FN_DELEGATE_FIELD_NAME, clazz, Visibility.PRIVATE, FieldManifestation.FINAL)
// Define a constructor to populate fields appropriately.
.defineConstructor(Visibility.PUBLIC)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/681ad896/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
index cf9f8e8..3238f2c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.DoFnWithContext.Context;
import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory;
import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessContext;
import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement;
+import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowingInternals;
@@ -46,10 +47,13 @@ import java.lang.reflect.Method;
@RunWith(JUnit4.class)
public class DoFnReflectorTest {
- private static class Invocations {
- private boolean wasProcessElementInvoked = false;
- private boolean wasStartBundleInvoked = false;
- private boolean wasFinishBundleInvoked = false;
+ /**
+ * A convenience struct holding flags that indicate whether a particular method was invoked.
+ */
+ public static class Invocations {
+ public boolean wasProcessElementInvoked = false;
+ public boolean wasStartBundleInvoked = false;
+ public boolean wasFinishBundleInvoked = false;
private final String name;
public Invocations(String name) {
@@ -357,6 +361,69 @@ public class DoFnReflectorTest {
});
}
+ private static class PrivateDoFnClass extends DoFnWithContext<String, String> {
+ final Invocations invocations = new Invocations(getClass().getName());
+
+ @ProcessElement
+ public void processThis(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ @Test
+ public void testLocalPrivateDoFnClass() throws Exception {
+ PrivateDoFnClass fn = new PrivateDoFnClass();
+ DoFnReflector reflector = underTest(fn);
+ checkInvokeProcessElementWorks(reflector, fn.invocations);
+ }
+
+ @Test
+ public void testStaticPackagePrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("StaticPackagePrivateDoFn");
+ DoFnReflector reflector =
+ underTest(DoFnReflectorTestHelper.newStaticPackagePrivateDoFn(invocations));
+ checkInvokeProcessElementWorks(reflector, invocations);
+ }
+
+ @Test
+ public void testInnerPackagePrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("InnerPackagePrivateDoFn");
+ DoFnReflector reflector =
+ underTest(new DoFnReflectorTestHelper().newInnerPackagePrivateDoFn(invocations));
+ checkInvokeProcessElementWorks(reflector, invocations);
+ }
+
+ @Test
+ public void testStaticPrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("StaticPrivateDoFn");
+ DoFnReflector reflector = underTest(DoFnReflectorTestHelper.newStaticPrivateDoFn(invocations));
+ checkInvokeProcessElementWorks(reflector, invocations);
+ }
+
+ @Test
+ public void testInnerPrivateDoFnClass() throws Exception {
+ Invocations invocations = new Invocations("StaticInnerDoFn");
+ DoFnReflector reflector =
+ underTest(new DoFnReflectorTestHelper().newInnerPrivateDoFn(invocations));
+ checkInvokeProcessElementWorks(reflector, invocations);
+ }
+
+ @Test
+ public void testAnonymousInnerDoFnInOtherPackage() throws Exception {
+ Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage");
+ DoFnReflector reflector =
+ underTest(new DoFnReflectorTestHelper().newInnerAnonymousDoFn(invocations));
+ checkInvokeProcessElementWorks(reflector, invocations);
+ }
+
+ @Test
+ public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
+ Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage");
+ DoFnReflector reflector =
+ underTest(DoFnReflectorTestHelper.newStaticAnonymousDoFn(invocations));
+ checkInvokeProcessElementWorks(reflector, invocations);
+ }
+
@Test
public void testPrivateProcessElement() throws Exception {
thrown.expect(IllegalStateException.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/681ad896/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
new file mode 100644
index 0000000..5ff2bf1
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.dofnreflector;
+
+import org.apache.beam.sdk.transforms.DoFnReflectorTest.Invocations;
+import org.apache.beam.sdk.transforms.DoFnWithContext;
+
+/**
+ * Test helper for DoFnReflectorTest, which needs to test package-private access
+ * to DoFns in other packages.
+ */
+public class DoFnReflectorTestHelper {
+
+ private static class StaticPrivateDoFn extends DoFnWithContext<String, String> {
+ final Invocations invocations;
+
+ public StaticPrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ private class InnerPrivateDoFn extends DoFnWithContext<String, String> {
+ final Invocations invocations;
+
+ public InnerPrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ static class StaticPackagePrivateDoFn extends DoFnWithContext<String, String> {
+ final Invocations invocations;
+
+ public StaticPackagePrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ class InnerPackagePrivateDoFn extends DoFnWithContext<String, String> {
+ final Invocations invocations;
+
+ public InnerPackagePrivateDoFn(Invocations invocations) {
+ this.invocations = invocations;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ }
+
+ public static DoFnWithContext<String, String> newStaticPackagePrivateDoFn(
+ Invocations invocations) {
+ return new StaticPackagePrivateDoFn(invocations);
+ }
+
+ public DoFnWithContext<String, String> newInnerPackagePrivateDoFn(Invocations invocations) {
+ return new InnerPackagePrivateDoFn(invocations);
+ }
+
+ public static DoFnWithContext<String, String> newStaticPrivateDoFn(Invocations invocations) {
+ return new StaticPrivateDoFn(invocations);
+ }
+
+ public DoFnWithContext<String, String> newInnerPrivateDoFn(Invocations invocations) {
+ return new InnerPrivateDoFn(invocations);
+ }
+
+ public DoFnWithContext<String, String> newInnerAnonymousDoFn(final Invocations invocations) {
+ return new DoFnWithContext<String, String>() {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ };
+ }
+
+ public static DoFnWithContext<String, String> newStaticAnonymousDoFn(
+ final Invocations invocations) {
+ return new DoFnWithContext<String, String>() {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ invocations.wasProcessElementInvoked = true;
+ }
+ };
+ }
+}
[2/2] incubator-beam git commit: This closes #738
Posted by ke...@apache.org.
This closes #738
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2cbac53c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2cbac53c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2cbac53c
Branch: refs/heads/master
Commit: 2cbac53c5532b29218aadc351f6b9c87b89aa1e0
Parents: 0866e49 681ad89
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jul 27 14:00:31 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Jul 27 14:00:31 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/DoFnReflector.java | 20 +++-
.../beam/sdk/transforms/DoFnReflectorTest.java | 75 +++++++++++-
.../dofnreflector/DoFnReflectorTestHelper.java | 116 +++++++++++++++++++
3 files changed, 203 insertions(+), 8 deletions(-)
----------------------------------------------------------------------