You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/11/07 00:07:11 UTC

[2/2] beam git commit: Migrate shared Fn Execution code to Java7

Migrate shared Fn Execution code to Java7


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/012c2e64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/012c2e64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/012c2e64

Branch: refs/heads/master
Commit: 012c2e64ceb691fd69649ba8cc02d9a2f16e519f
Parents: 766b4f3
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 18 11:26:48 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 6 16:06:57 2017 -0800

----------------------------------------------------------------------
 runners/java-fn-execution/pom.xml               | 14 ------
 .../runners/fnexecution/ServerFactoryTest.java  | 45 ++++++++++++++-----
 runners/pom.xml                                 |  2 +-
 sdks/java/fn-execution/pom.xml                  | 20 ++-------
 .../org/apache/beam/harness/test/Consumer.java  | 26 +++++++++++
 .../org/apache/beam/harness/test/Supplier.java  | 26 +++++++++++
 .../apache/beam/harness/test/TestExecutors.java | 12 ++++-
 .../beam/harness/test/TestExecutorsTest.java    | 29 +++++++++---
 .../apache/beam/harness/test/TestStreams.java   | 35 ++++++++++++---
 .../beam/harness/test/TestStreamsTest.java      | 47 +++++++++++++++-----
 .../apache/beam/fn/harness/FnHarnessTest.java   |  4 +-
 .../harness/data/BeamFnDataGrpcClientTest.java  |  5 ++-
 .../stream/BufferingStreamObserverTest.java     |  2 +-
 .../stream/DirectStreamObserverTest.java        |  2 +-
 sdks/java/pom.xml                               |  2 +-
 15 files changed, 196 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/runners/java-fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
index bd4fcf0..f57c58b 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -32,20 +32,6 @@
 
   <packaging>jar</packaging>
 
-  <build>
-    <plugins>
-      <plugin>
-        <!--  Override Beam parent to allow Java8 -->
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <source>1.8</source>
-          <target>1.8</target>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
index aa8d246..b78e88a 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -39,6 +39,7 @@ import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.beam.harness.channel.ManagedChannelFactory;
+import org.apache.beam.harness.test.Consumer;
 import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
@@ -74,24 +75,48 @@ public class ServerFactoryTest {
     Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
         Endpoints.ApiServiceDescriptor.newBuilder();
 
-    Collection<Elements> serverElements = new ArrayList<>();
-    CountDownLatch clientHangedUp = new CountDownLatch(1);
+    final Collection<Elements> serverElements = new ArrayList<>();
+    final CountDownLatch clientHangedUp = new CountDownLatch(1);
     CallStreamObserver<Elements> serverInboundObserver =
-        TestStreams.withOnNext(serverElements::add)
-        .withOnCompleted(clientHangedUp::countDown)
-        .build();
+        TestStreams.withOnNext(
+                new Consumer<Elements>() {
+                  @Override
+                  public void accept(Elements item) {
+                    serverElements.add(item);
+                  }
+                })
+            .withOnCompleted(
+                new Runnable() {
+                  @Override
+                  public void run() {
+                    clientHangedUp.countDown();
+                  }
+                })
+            .build();
     TestDataService service = new TestDataService(serverInboundObserver);
     Server server = serverFactory.allocatePortAndCreate(service, apiServiceDescriptorBuilder);
     assertFalse(server.isShutdown());
 
     ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());
     BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel);
-    Collection<BeamFnApi.Elements> clientElements = new ArrayList<>();
-    CountDownLatch serverHangedUp = new CountDownLatch(1);
+    final Collection<BeamFnApi.Elements> clientElements = new ArrayList<>();
+    final CountDownLatch serverHangedUp = new CountDownLatch(1);
     CallStreamObserver<BeamFnApi.Elements> clientInboundObserver =
-        TestStreams.withOnNext(clientElements::add)
-        .withOnCompleted(serverHangedUp::countDown)
-        .build();
+        TestStreams.withOnNext(
+                new Consumer<Elements>() {
+                  @Override
+                  public void accept(Elements item) {
+                    clientElements.add(item);
+                  }
+                })
+            .withOnCompleted(
+                new Runnable() {
+                  @Override
+                  public void run() {
+                    serverHangedUp.countDown();
+                  }
+                })
+            .build();
 
     StreamObserver<Elements> clientOutboundObserver = stub.data(clientInboundObserver);
     StreamObserver<BeamFnApi.Elements> serverOutboundObserver = service.outboundObservers.take();

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index df3faa9..47f3c0e 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
   <modules>
     <module>core-construction-java</module>
     <module>core-java</module>
+    <module>java-fn-execution</module>
     <module>local-artifact-service-java</module>
     <module>reference</module>
     <module>direct-java</module>
@@ -63,7 +64,6 @@
         <jdk>[1.8,)</jdk>
       </activation>
       <modules>
-        <module>java-fn-execution</module>
         <module>gearpump</module>
       </modules>
     </profile>

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/pom.xml b/sdks/java/fn-execution/pom.xml
index 9929c29..7c203eb 100644
--- a/sdks/java/fn-execution/pom.xml
+++ b/sdks/java/fn-execution/pom.xml
@@ -27,27 +27,13 @@
   </parent>
 
   <artifactId>beam-sdks-java-fn-execution</artifactId>
-  <name>Apache Beam :: SDKs :: Java :: Harness Core</name>
-  <description>Contains code shared across the Beam Java SDK Harness and the Java Runner Harness
-    libraries.
+  <name>Apache Beam :: SDKs :: Java :: Fn Execution</name>
+  <description>Contains code shared across the Beam Java SDK Harness Java Runners to execute using
+    the Beam Portability Framework
   </description>
 
   <packaging>jar</packaging>
 
-  <build>
-    <plugins>
-      <plugin>
-        <!--  Override Beam parent to allow Java8 -->
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <source>1.8</source>
-          <target>1.8</target>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
new file mode 100644
index 0000000..279fc29
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.test;
+
+/**
+ * A fork of the Java 8 consumer interface. This exists to enable migration for existing consumers.
+ */
+public interface Consumer<T> {
+  void accept(T item);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
new file mode 100644
index 0000000..629afc2
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.test;
+
+/**
+ * A fork of the Java 8 Supplier interface, to enable migrations.
+ */
+public interface Supplier<T> {
+  T get();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
index d818a61..ca12d5a 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
@@ -21,7 +21,6 @@ package org.apache.beam.harness.test;
 import com.google.common.util.concurrent.ForwardingExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
@@ -31,6 +30,15 @@ import org.junit.runners.model.Statement;
  * allows for testing that tasks have exercised the appropriate shutdown logic.
  */
 public class TestExecutors {
+  public static TestExecutorService from(final ExecutorService staticExecutorService) {
+    return from(new Supplier<ExecutorService>() {
+      @Override
+      public ExecutorService get() {
+        return staticExecutorService;
+      }
+    });
+  }
+
   public static TestExecutorService from(Supplier<ExecutorService> executorServiceSuppler) {
     return new FromSupplier(executorServiceSuppler);
   }
@@ -48,7 +56,7 @@ public class TestExecutors {
     }
 
     @Override
-    public Statement apply(Statement statement, Description arg1) {
+    public Statement apply(final Statement statement, Description arg1) {
       return new Statement() {
         @Override
         public void evaluate() throws Throwable {

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
index 1381b55..f0c98e0 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
@@ -38,14 +38,19 @@ public class TestExecutorsTest {
   @Test
   public void testSuccessfulTermination() throws Throwable {
     ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
+    final TestExecutorService testService = TestExecutors.from(service);
     final AtomicBoolean taskRan = new AtomicBoolean();
     testService
         .apply(
             new Statement() {
               @Override
               public void evaluate() throws Throwable {
-                testService.submit(() -> taskRan.set(true));
+                testService.submit(new Runnable() {
+                  @Override
+                  public void run() {
+                    taskRan.set(true);
+                  }
+                });
               }
             },
             null)
@@ -57,7 +62,7 @@ public class TestExecutorsTest {
   @Test
   public void testTaskBlocksForeverCausesFailure() throws Throwable {
     ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
+    final TestExecutorService testService = TestExecutors.from(service);
     final AtomicBoolean taskStarted = new AtomicBoolean();
     final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
     try {
@@ -66,7 +71,12 @@ public class TestExecutorsTest {
               new Statement() {
                 @Override
                 public void evaluate() throws Throwable {
-                  testService.submit(this::taskToRun);
+                  testService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                      taskToRun();
+                    }
+                  });
                 }
 
                 private void taskToRun() {
@@ -94,7 +104,7 @@ public class TestExecutorsTest {
   @Test
   public void testStatementFailurePropagatedCleanly() throws Throwable {
     ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
+    final TestExecutorService testService = TestExecutors.from(service);
     final RuntimeException exceptionToThrow = new RuntimeException();
     try {
       testService
@@ -118,7 +128,7 @@ public class TestExecutorsTest {
   public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate()
       throws Throwable {
     ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
+    final TestExecutorService testService = TestExecutors.from(service);
     final AtomicBoolean taskStarted = new AtomicBoolean();
     final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
     final RuntimeException exceptionToThrow = new RuntimeException();
@@ -128,7 +138,12 @@ public class TestExecutorsTest {
               new Statement() {
                 @Override
                 public void evaluate() throws Throwable {
-                  testService.submit(this::taskToRun);
+                  testService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                      taskToRun();
+                    }
+                  });
                   throw exceptionToThrow;
                 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
index a7b362d..3df743a 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
@@ -20,8 +20,6 @@ package org.apache.beam.harness.test;
 
 import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
 
 /** Utility methods which enable testing of {@link StreamObserver}s. */
 public class TestStreams {
@@ -32,9 +30,9 @@ public class TestStreams {
   public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
     return new Builder<>(new ForwardingCallStreamObserver<>(
         onNext,
-        TestStreams::noop,
-        TestStreams::noop,
-        TestStreams::returnTrue));
+        TestStreams.<Throwable>noopConsumer(),
+        TestStreams.noopRunnable(),
+        TestStreams.alwaysTrueSupplier()));
   }
 
   /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
@@ -72,7 +70,7 @@ public class TestStreams {
      * Returns a new {@link Builder} like this one with the specified
      * {@link StreamObserver#onError} callback.
      */
-    public Builder<T> withOnError(Runnable onError) {
+    public Builder<T> withOnError(final Runnable onError) {
       return new Builder<>(new ForwardingCallStreamObserver<>(
           observer.onNext,
           new Consumer<Throwable>() {
@@ -102,13 +100,38 @@ public class TestStreams {
   private static void noop() {
   }
 
+  private static Runnable noopRunnable() {
+    return new Runnable() {
+      @Override
+      public void run() {
+      }
+    };
+  }
+
   private static void noop(Throwable t) {
   }
 
+  private static <T> Consumer<T> noopConsumer() {
+    return new Consumer<T>() {
+      @Override
+      public void accept(T item) {
+      }
+    };
+  }
+
   private static boolean returnTrue() {
     return true;
   }
 
+  private static Supplier<Boolean> alwaysTrueSupplier() {
+    return new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return true;
+      }
+    };
+  }
+
   /** A {@link CallStreamObserver} which executes the supplied callbacks. */
   private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> {
     private final Consumer<T> onNext;

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
index f5741ae..c578397 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.harness.test;
 
-import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -26,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -35,8 +35,13 @@ import org.junit.runners.JUnit4;
 public class TestStreamsTest {
   @Test
   public void testOnNextIsCalled() {
-    AtomicBoolean onNextWasCalled = new AtomicBoolean();
-    TestStreams.withOnNext(onNextWasCalled::set).build().onNext(true);
+    final AtomicBoolean onNextWasCalled = new AtomicBoolean();
+    TestStreams.withOnNext(new Consumer<Boolean>() {
+      @Override
+      public void accept(Boolean item) {
+        onNextWasCalled.set(item);
+      }
+    }).build().onNext(true);
     assertTrue(onNextWasCalled.get());
   }
 
@@ -44,7 +49,12 @@ public class TestStreamsTest {
   public void testIsReadyIsCalled() {
     final AtomicBoolean isReadyWasCalled = new AtomicBoolean();
     assertFalse(TestStreams.withOnNext(null)
-        .withIsReady(() -> isReadyWasCalled.getAndSet(true))
+        .withIsReady(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            return isReadyWasCalled.getAndSet(true);
+          }
+        })
         .build()
         .isReady());
     assertTrue(isReadyWasCalled.get());
@@ -52,9 +62,14 @@ public class TestStreamsTest {
 
   @Test
   public void testOnCompletedIsCalled() {
-    AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
+    final AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
     TestStreams.withOnNext(null)
-        .withOnCompleted(() -> onCompletedWasCalled.set(true))
+        .withOnCompleted(new Runnable() {
+          @Override
+          public void run() {
+            onCompletedWasCalled.set(true);
+          }
+        })
         .build()
         .onCompleted();
     assertTrue(onCompletedWasCalled.get());
@@ -63,9 +78,14 @@ public class TestStreamsTest {
   @Test
   public void testOnErrorRunnableIsCalled() {
     RuntimeException throwable = new RuntimeException();
-    AtomicBoolean onErrorWasCalled = new AtomicBoolean();
+    final AtomicBoolean onErrorWasCalled = new AtomicBoolean();
     TestStreams.withOnNext(null)
-        .withOnError(() -> onErrorWasCalled.set(true))
+        .withOnError(new Runnable() {
+          @Override
+          public void run() {
+            onErrorWasCalled.set(true);
+          }
+        })
         .build()
         .onError(throwable);
     assertTrue(onErrorWasCalled.get());
@@ -74,11 +94,16 @@ public class TestStreamsTest {
   @Test
   public void testOnErrorConsumerIsCalled() {
     RuntimeException throwable = new RuntimeException();
-    Collection<Throwable> onErrorWasCalled = new ArrayList<>();
+    final Collection<Throwable> onErrorWasCalled = new ArrayList<>();
     TestStreams.withOnNext(null)
-        .withOnError(onErrorWasCalled::add)
+        .withOnError(new Consumer<Throwable>() {
+          @Override
+          public void accept(Throwable item) {
+            onErrorWasCalled.add(item);
+          }
+        })
         .build()
         .onError(throwable);
-    assertThat(onErrorWasCalled, contains(throwable));
+    assertThat(onErrorWasCalled, Matchers.<Throwable>contains(throwable));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
index 66c31a8..c926414 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
@@ -28,7 +28,7 @@ import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.function.Consumer;
+import org.apache.beam.harness.test.Consumer;
 import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -91,7 +91,7 @@ public class FnHarnessTest {
             responseObserver.onCompleted();
           }
         });
-        return TestStreams.withOnNext(new Consumer<BeamFnApi.InstructionResponse>() {
+        return TestStreams.withOnNext(new Consumer<InstructionResponse>() {
           @Override
           public void accept(InstructionResponse t) {
             instructionResponses.add(t);

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
index 7df8925..9e21398 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -41,12 +41,13 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.harness.test.Consumer;
 import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.coders.Coder;
@@ -263,7 +264,7 @@ public class BeamFnDataGrpcClientTest {
     Collection<BeamFnApi.Elements> inboundServerValues = new ConcurrentLinkedQueue<>();
     CallStreamObserver<BeamFnApi.Elements> inboundServerObserver =
         TestStreams.withOnNext(
-            new Consumer<BeamFnApi.Elements>() {
+            new Consumer<Elements>() {
               @Override
               public void accept(BeamFnApi.Elements t) {
                 inboundServerValues.add(t);

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
index 3f66c4c..96648e9 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
@@ -31,7 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
+import org.apache.beam.harness.test.Consumer;
 import org.apache.beam.harness.test.TestExecutors;
 import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.harness.test.TestStreams;

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
index 120a73d..05d8d5a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
@@ -31,7 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
+import org.apache.beam.harness.test.Consumer;
 import org.apache.beam.harness.test.TestExecutors;
 import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.harness.test.TestStreams;

http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 62e4ec3..c6ab234 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -40,6 +40,7 @@
     <module>io</module>
     <module>maven-archetypes</module>
     <module>extensions</module>
+    <module>fn-execution</module>
     <!-- javadoc runs directly from the root parent as the last module
          in the build to be able to capture runner-specific javadoc.
     <module>javadoc</module> -->
@@ -53,7 +54,6 @@
         <jdk>[1.8,)</jdk>
       </activation>
       <modules>
-        <module>fn-execution</module>
         <module>harness</module>
         <module>container</module>
         <module>java8tests</module>