You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/11/07 00:07:11 UTC
[2/2] beam git commit: Migrate shared Fn Execution code to Java7
Migrate shared Fn Execution code to Java7
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/012c2e64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/012c2e64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/012c2e64
Branch: refs/heads/master
Commit: 012c2e64ceb691fd69649ba8cc02d9a2f16e519f
Parents: 766b4f3
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 18 11:26:48 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 6 16:06:57 2017 -0800
----------------------------------------------------------------------
runners/java-fn-execution/pom.xml | 14 ------
.../runners/fnexecution/ServerFactoryTest.java | 45 ++++++++++++++-----
runners/pom.xml | 2 +-
sdks/java/fn-execution/pom.xml | 20 ++-------
.../org/apache/beam/harness/test/Consumer.java | 26 +++++++++++
.../org/apache/beam/harness/test/Supplier.java | 26 +++++++++++
.../apache/beam/harness/test/TestExecutors.java | 12 ++++-
.../beam/harness/test/TestExecutorsTest.java | 29 +++++++++---
.../apache/beam/harness/test/TestStreams.java | 35 ++++++++++++---
.../beam/harness/test/TestStreamsTest.java | 47 +++++++++++++++-----
.../apache/beam/fn/harness/FnHarnessTest.java | 4 +-
.../harness/data/BeamFnDataGrpcClientTest.java | 5 ++-
.../stream/BufferingStreamObserverTest.java | 2 +-
.../stream/DirectStreamObserverTest.java | 2 +-
sdks/java/pom.xml | 2 +-
15 files changed, 196 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/runners/java-fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
index bd4fcf0..f57c58b 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -32,20 +32,6 @@
<packaging>jar</packaging>
- <build>
- <plugins>
- <plugin>
- <!-- Override Beam parent to allow Java8 -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
index aa8d246..b78e88a 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -39,6 +39,7 @@ import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.harness.channel.ManagedChannelFactory;
+import org.apache.beam.harness.test.Consumer;
import org.apache.beam.harness.test.TestStreams;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
@@ -74,24 +75,48 @@ public class ServerFactoryTest {
Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
Endpoints.ApiServiceDescriptor.newBuilder();
- Collection<Elements> serverElements = new ArrayList<>();
- CountDownLatch clientHangedUp = new CountDownLatch(1);
+ final Collection<Elements> serverElements = new ArrayList<>();
+ final CountDownLatch clientHangedUp = new CountDownLatch(1);
CallStreamObserver<Elements> serverInboundObserver =
- TestStreams.withOnNext(serverElements::add)
- .withOnCompleted(clientHangedUp::countDown)
- .build();
+ TestStreams.withOnNext(
+ new Consumer<Elements>() {
+ @Override
+ public void accept(Elements item) {
+ serverElements.add(item);
+ }
+ })
+ .withOnCompleted(
+ new Runnable() {
+ @Override
+ public void run() {
+ clientHangedUp.countDown();
+ }
+ })
+ .build();
TestDataService service = new TestDataService(serverInboundObserver);
Server server = serverFactory.allocatePortAndCreate(service, apiServiceDescriptorBuilder);
assertFalse(server.isShutdown());
ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());
BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel);
- Collection<BeamFnApi.Elements> clientElements = new ArrayList<>();
- CountDownLatch serverHangedUp = new CountDownLatch(1);
+ final Collection<BeamFnApi.Elements> clientElements = new ArrayList<>();
+ final CountDownLatch serverHangedUp = new CountDownLatch(1);
CallStreamObserver<BeamFnApi.Elements> clientInboundObserver =
- TestStreams.withOnNext(clientElements::add)
- .withOnCompleted(serverHangedUp::countDown)
- .build();
+ TestStreams.withOnNext(
+ new Consumer<Elements>() {
+ @Override
+ public void accept(Elements item) {
+ clientElements.add(item);
+ }
+ })
+ .withOnCompleted(
+ new Runnable() {
+ @Override
+ public void run() {
+ serverHangedUp.countDown();
+ }
+ })
+ .build();
StreamObserver<Elements> clientOutboundObserver = stub.data(clientInboundObserver);
StreamObserver<BeamFnApi.Elements> serverOutboundObserver = service.outboundObservers.take();
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index df3faa9..47f3c0e 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
<modules>
<module>core-construction-java</module>
<module>core-java</module>
+ <module>java-fn-execution</module>
<module>local-artifact-service-java</module>
<module>reference</module>
<module>direct-java</module>
@@ -63,7 +64,6 @@
<jdk>[1.8,)</jdk>
</activation>
<modules>
- <module>java-fn-execution</module>
<module>gearpump</module>
</modules>
</profile>
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/pom.xml b/sdks/java/fn-execution/pom.xml
index 9929c29..7c203eb 100644
--- a/sdks/java/fn-execution/pom.xml
+++ b/sdks/java/fn-execution/pom.xml
@@ -27,27 +27,13 @@
</parent>
<artifactId>beam-sdks-java-fn-execution</artifactId>
- <name>Apache Beam :: SDKs :: Java :: Harness Core</name>
- <description>Contains code shared across the Beam Java SDK Harness and the Java Runner Harness
- libraries.
+ <name>Apache Beam :: SDKs :: Java :: Fn Execution</name>
+ <description>Contains code shared across the Beam Java SDK Harness Java Runners to execute using
+ the Beam Portability Framework
</description>
<packaging>jar</packaging>
- <build>
- <plugins>
- <plugin>
- <!-- Override Beam parent to allow Java8 -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
new file mode 100644
index 0000000..279fc29
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.test;
+
+/**
+ * A fork of the Java 8 consumer interface. This exists to enable migration for existing consumers.
+ */
+public interface Consumer<T> {
+ void accept(T item);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
new file mode 100644
index 0000000..629afc2
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.test;
+
+/**
+ * A fork of the Java 8 Supplier interface, to enable migrations.
+ */
+public interface Supplier<T> {
+ T get();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
index d818a61..ca12d5a 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
@@ -21,7 +21,6 @@ package org.apache.beam.harness.test;
import com.google.common.util.concurrent.ForwardingExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
@@ -31,6 +30,15 @@ import org.junit.runners.model.Statement;
* allows for testing that tasks have exercised the appropriate shutdown logic.
*/
public class TestExecutors {
+ public static TestExecutorService from(final ExecutorService staticExecutorService) {
+ return from(new Supplier<ExecutorService>() {
+ @Override
+ public ExecutorService get() {
+ return staticExecutorService;
+ }
+ });
+ }
+
public static TestExecutorService from(Supplier<ExecutorService> executorServiceSuppler) {
return new FromSupplier(executorServiceSuppler);
}
@@ -48,7 +56,7 @@ public class TestExecutors {
}
@Override
- public Statement apply(Statement statement, Description arg1) {
+ public Statement apply(final Statement statement, Description arg1) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
index 1381b55..f0c98e0 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
@@ -38,14 +38,19 @@ public class TestExecutorsTest {
@Test
public void testSuccessfulTermination() throws Throwable {
ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(() -> service);
+ final TestExecutorService testService = TestExecutors.from(service);
final AtomicBoolean taskRan = new AtomicBoolean();
testService
.apply(
new Statement() {
@Override
public void evaluate() throws Throwable {
- testService.submit(() -> taskRan.set(true));
+ testService.submit(new Runnable() {
+ @Override
+ public void run() {
+ taskRan.set(true);
+ }
+ });
}
},
null)
@@ -57,7 +62,7 @@ public class TestExecutorsTest {
@Test
public void testTaskBlocksForeverCausesFailure() throws Throwable {
ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(() -> service);
+ final TestExecutorService testService = TestExecutors.from(service);
final AtomicBoolean taskStarted = new AtomicBoolean();
final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
try {
@@ -66,7 +71,12 @@ public class TestExecutorsTest {
new Statement() {
@Override
public void evaluate() throws Throwable {
- testService.submit(this::taskToRun);
+ testService.submit(new Runnable() {
+ @Override
+ public void run() {
+ taskToRun();
+ }
+ });
}
private void taskToRun() {
@@ -94,7 +104,7 @@ public class TestExecutorsTest {
@Test
public void testStatementFailurePropagatedCleanly() throws Throwable {
ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(() -> service);
+ final TestExecutorService testService = TestExecutors.from(service);
final RuntimeException exceptionToThrow = new RuntimeException();
try {
testService
@@ -118,7 +128,7 @@ public class TestExecutorsTest {
public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate()
throws Throwable {
ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(() -> service);
+ final TestExecutorService testService = TestExecutors.from(service);
final AtomicBoolean taskStarted = new AtomicBoolean();
final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
final RuntimeException exceptionToThrow = new RuntimeException();
@@ -128,7 +138,12 @@ public class TestExecutorsTest {
new Statement() {
@Override
public void evaluate() throws Throwable {
- testService.submit(this::taskToRun);
+ testService.submit(new Runnable() {
+ @Override
+ public void run() {
+ taskToRun();
+ }
+ });
throw exceptionToThrow;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
index a7b362d..3df743a 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
@@ -20,8 +20,6 @@ package org.apache.beam.harness.test;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
/** Utility methods which enable testing of {@link StreamObserver}s. */
public class TestStreams {
@@ -32,9 +30,9 @@ public class TestStreams {
public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
return new Builder<>(new ForwardingCallStreamObserver<>(
onNext,
- TestStreams::noop,
- TestStreams::noop,
- TestStreams::returnTrue));
+ TestStreams.<Throwable>noopConsumer(),
+ TestStreams.noopRunnable(),
+ TestStreams.alwaysTrueSupplier()));
}
/** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
@@ -72,7 +70,7 @@ public class TestStreams {
* Returns a new {@link Builder} like this one with the specified
* {@link StreamObserver#onError} callback.
*/
- public Builder<T> withOnError(Runnable onError) {
+ public Builder<T> withOnError(final Runnable onError) {
return new Builder<>(new ForwardingCallStreamObserver<>(
observer.onNext,
new Consumer<Throwable>() {
@@ -102,13 +100,38 @@ public class TestStreams {
private static void noop() {
}
+ private static Runnable noopRunnable() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ }
+ };
+ }
+
private static void noop(Throwable t) {
}
+ private static <T> Consumer<T> noopConsumer() {
+ return new Consumer<T>() {
+ @Override
+ public void accept(T item) {
+ }
+ };
+ }
+
private static boolean returnTrue() {
return true;
}
+ private static Supplier<Boolean> alwaysTrueSupplier() {
+ return new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return true;
+ }
+ };
+ }
+
/** A {@link CallStreamObserver} which executes the supplied callbacks. */
private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> {
private final Consumer<T> onNext;
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
index f5741ae..c578397 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
@@ -18,7 +18,6 @@
package org.apache.beam.harness.test;
-import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -26,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -35,8 +35,13 @@ import org.junit.runners.JUnit4;
public class TestStreamsTest {
@Test
public void testOnNextIsCalled() {
- AtomicBoolean onNextWasCalled = new AtomicBoolean();
- TestStreams.withOnNext(onNextWasCalled::set).build().onNext(true);
+ final AtomicBoolean onNextWasCalled = new AtomicBoolean();
+ TestStreams.withOnNext(new Consumer<Boolean>() {
+ @Override
+ public void accept(Boolean item) {
+ onNextWasCalled.set(item);
+ }
+ }).build().onNext(true);
assertTrue(onNextWasCalled.get());
}
@@ -44,7 +49,12 @@ public class TestStreamsTest {
public void testIsReadyIsCalled() {
final AtomicBoolean isReadyWasCalled = new AtomicBoolean();
assertFalse(TestStreams.withOnNext(null)
- .withIsReady(() -> isReadyWasCalled.getAndSet(true))
+ .withIsReady(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return isReadyWasCalled.getAndSet(true);
+ }
+ })
.build()
.isReady());
assertTrue(isReadyWasCalled.get());
@@ -52,9 +62,14 @@ public class TestStreamsTest {
@Test
public void testOnCompletedIsCalled() {
- AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
+ final AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
TestStreams.withOnNext(null)
- .withOnCompleted(() -> onCompletedWasCalled.set(true))
+ .withOnCompleted(new Runnable() {
+ @Override
+ public void run() {
+ onCompletedWasCalled.set(true);
+ }
+ })
.build()
.onCompleted();
assertTrue(onCompletedWasCalled.get());
@@ -63,9 +78,14 @@ public class TestStreamsTest {
@Test
public void testOnErrorRunnableIsCalled() {
RuntimeException throwable = new RuntimeException();
- AtomicBoolean onErrorWasCalled = new AtomicBoolean();
+ final AtomicBoolean onErrorWasCalled = new AtomicBoolean();
TestStreams.withOnNext(null)
- .withOnError(() -> onErrorWasCalled.set(true))
+ .withOnError(new Runnable() {
+ @Override
+ public void run() {
+ onErrorWasCalled.set(true);
+ }
+ })
.build()
.onError(throwable);
assertTrue(onErrorWasCalled.get());
@@ -74,11 +94,16 @@ public class TestStreamsTest {
@Test
public void testOnErrorConsumerIsCalled() {
RuntimeException throwable = new RuntimeException();
- Collection<Throwable> onErrorWasCalled = new ArrayList<>();
+ final Collection<Throwable> onErrorWasCalled = new ArrayList<>();
TestStreams.withOnNext(null)
- .withOnError(onErrorWasCalled::add)
+ .withOnError(new Consumer<Throwable>() {
+ @Override
+ public void accept(Throwable item) {
+ onErrorWasCalled.add(item);
+ }
+ })
.build()
.onError(throwable);
- assertThat(onErrorWasCalled, contains(throwable));
+ assertThat(onErrorWasCalled, Matchers.<Throwable>contains(throwable));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
index 66c31a8..c926414 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
@@ -28,7 +28,7 @@ import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.function.Consumer;
+import org.apache.beam.harness.test.Consumer;
import org.apache.beam.harness.test.TestStreams;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -91,7 +91,7 @@ public class FnHarnessTest {
responseObserver.onCompleted();
}
});
- return TestStreams.withOnNext(new Consumer<BeamFnApi.InstructionResponse>() {
+ return TestStreams.withOnNext(new Consumer<InstructionResponse>() {
@Override
public void accept(InstructionResponse t) {
instructionResponses.add(t);
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
index 7df8925..9e21398 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -41,12 +41,13 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.harness.test.Consumer;
import org.apache.beam.harness.test.TestStreams;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
@@ -263,7 +264,7 @@ public class BeamFnDataGrpcClientTest {
Collection<BeamFnApi.Elements> inboundServerValues = new ConcurrentLinkedQueue<>();
CallStreamObserver<BeamFnApi.Elements> inboundServerObserver =
TestStreams.withOnNext(
- new Consumer<BeamFnApi.Elements>() {
+ new Consumer<Elements>() {
@Override
public void accept(BeamFnApi.Elements t) {
inboundServerValues.add(t);
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
index 3f66c4c..96648e9 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
@@ -31,7 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
+import org.apache.beam.harness.test.Consumer;
import org.apache.beam.harness.test.TestExecutors;
import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
import org.apache.beam.harness.test.TestStreams;
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
index 120a73d..05d8d5a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
@@ -31,7 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
+import org.apache.beam.harness.test.Consumer;
import org.apache.beam.harness.test.TestExecutors;
import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
import org.apache.beam.harness.test.TestStreams;
http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 62e4ec3..c6ab234 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -40,6 +40,7 @@
<module>io</module>
<module>maven-archetypes</module>
<module>extensions</module>
+ <module>fn-execution</module>
<!-- javadoc runs directly from the root parent as the last module
in the build to be able to capture runner-specific javadoc.
<module>javadoc</module> -->
@@ -53,7 +54,6 @@
<jdk>[1.8,)</jdk>
</activation>
<modules>
- <module>fn-execution</module>
<module>harness</module>
<module>container</module>
<module>java8tests</module>