You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/07 16:54:39 UTC
[23/50] [abbrv] beam git commit: Add sdks/java/fn-execution
http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
deleted file mode 100644
index 85c64d0..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.junit.runners.model.Statement;
-
-/** Tests for {@link TestExecutors}. */
-@RunWith(JUnit4.class)
-public class TestExecutorsTest {
- @Test
- public void testSuccessfulTermination() throws Throwable {
- ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(() -> service);
- final AtomicBoolean taskRan = new AtomicBoolean();
- testService
- .apply(
- new Statement() {
- @Override
- public void evaluate() throws Throwable {
- testService.submit(() -> taskRan.set(true));
- }
- },
- null)
- .evaluate();
- assertTrue(service.isTerminated());
- assertTrue(taskRan.get());
- }
-
- @Test
- public void testTaskBlocksForeverCausesFailure() throws Throwable {
- ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(() -> service);
- final AtomicBoolean taskStarted = new AtomicBoolean();
- final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
- try {
- testService
- .apply(
- new Statement() {
- @Override
- public void evaluate() throws Throwable {
- testService.submit(this::taskToRun);
- }
-
- private void taskToRun() {
- taskStarted.set(true);
- try {
- while (true) {
- Thread.sleep(10000);
- }
- } catch (InterruptedException e) {
- taskWasInterrupted.set(true);
- return;
- }
- }
- },
- null)
- .evaluate();
- fail();
- } catch (IllegalStateException e) {
- assertEquals(IllegalStateException.class, e.getClass());
- assertEquals("Test executor failed to shutdown cleanly.", e.getMessage());
- }
- assertTrue(service.isShutdown());
- }
-
- @Test
- public void testStatementFailurePropagatedCleanly() throws Throwable {
- ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(() -> service);
- final RuntimeException exceptionToThrow = new RuntimeException();
- try {
- testService
- .apply(
- new Statement() {
- @Override
- public void evaluate() throws Throwable {
- throw exceptionToThrow;
- }
- },
- null)
- .evaluate();
- fail();
- } catch (RuntimeException thrownException) {
- assertSame(exceptionToThrow, thrownException);
- }
- assertTrue(service.isShutdown());
- }
-
- @Test
- public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate()
- throws Throwable {
- ExecutorService service = Executors.newSingleThreadExecutor();
- final TestExecutorService testService = TestExecutors.from(() -> service);
- final AtomicBoolean taskStarted = new AtomicBoolean();
- final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
- final RuntimeException exceptionToThrow = new RuntimeException();
- try {
- testService
- .apply(
- new Statement() {
- @Override
- public void evaluate() throws Throwable {
- testService.submit(this::taskToRun);
- throw exceptionToThrow;
- }
-
- private void taskToRun() {
- taskStarted.set(true);
- try {
- while (true) {
- Thread.sleep(10000);
- }
- } catch (InterruptedException e) {
- taskWasInterrupted.set(true);
- return;
- }
- }
- },
- null)
- .evaluate();
- fail();
- } catch (RuntimeException thrownException) {
- assertSame(exceptionToThrow, thrownException);
- assertEquals(1, exceptionToThrow.getSuppressed().length);
- assertEquals(IllegalStateException.class, exceptionToThrow.getSuppressed()[0].getClass());
- assertEquals(
- "Test executor failed to shutdown cleanly.",
- exceptionToThrow.getSuppressed()[0].getMessage());
- }
- assertTrue(service.isShutdown());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
deleted file mode 100644
index f398286..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.test;
-
-import io.grpc.stub.CallStreamObserver;
-import io.grpc.stub.StreamObserver;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-/** Utility methods which enable testing of {@link StreamObserver}s. */
-public class TestStreams {
- /**
- * Creates a test {@link CallStreamObserver} {@link Builder} that forwards
- * {@link StreamObserver#onNext} calls to the supplied {@link Consumer}.
- */
- public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
- return new Builder<>(new ForwardingCallStreamObserver<>(
- onNext,
- TestStreams::noop,
- TestStreams::noop,
- TestStreams::returnTrue));
- }
-
- /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
- public static class Builder<T> {
- private final ForwardingCallStreamObserver<T> observer;
- private Builder(ForwardingCallStreamObserver<T> observer) {
- this.observer = observer;
- }
-
- /**
- * Returns a new {@link Builder} like this one with the specified
- * {@link CallStreamObserver#isReady} callback.
- */
- public Builder<T> withIsReady(Supplier<Boolean> isReady) {
- return new Builder<>(new ForwardingCallStreamObserver<>(
- observer.onNext,
- observer.onError,
- observer.onCompleted,
- isReady));
- }
-
- /**
- * Returns a new {@link Builder} like this one with the specified
- * {@link StreamObserver#onCompleted} callback.
- */
- public Builder<T> withOnCompleted(Runnable onCompleted) {
- return new Builder<>(new ForwardingCallStreamObserver<>(
- observer.onNext,
- observer.onError,
- onCompleted,
- observer.isReady));
- }
-
- /**
- * Returns a new {@link Builder} like this one with the specified
- * {@link StreamObserver#onError} callback.
- */
- public Builder<T> withOnError(Runnable onError) {
- return new Builder<>(new ForwardingCallStreamObserver<>(
- observer.onNext,
- new Consumer<Throwable>() {
- @Override
- public void accept(Throwable t) {
- onError.run();
- }
- },
- observer.onCompleted,
- observer.isReady));
- }
-
- /**
- * Returns a new {@link Builder} like this one with the specified
- * {@link StreamObserver#onError} consumer.
- */
- public Builder<T> withOnError(Consumer<Throwable> onError) {
- return new Builder<>(new ForwardingCallStreamObserver<>(
- observer.onNext, onError, observer.onCompleted, observer.isReady));
- }
-
- public CallStreamObserver<T> build() {
- return observer;
- }
- }
-
- private static void noop() {
- }
-
- private static void noop(Throwable t) {
- }
-
- private static boolean returnTrue() {
- return true;
- }
-
- /** A {@link CallStreamObserver} which executes the supplied callbacks. */
- private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> {
- private final Consumer<T> onNext;
- private final Supplier<Boolean> isReady;
- private final Consumer<Throwable> onError;
- private final Runnable onCompleted;
-
- public ForwardingCallStreamObserver(
- Consumer<T> onNext,
- Consumer<Throwable> onError,
- Runnable onCompleted,
- Supplier<Boolean> isReady) {
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- this.isReady = isReady;
- }
-
- @Override
- public void onNext(T value) {
- onNext.accept(value);
- }
-
- @Override
- public void onError(Throwable t) {
- onError.accept(t);
- }
-
- @Override
- public void onCompleted() {
- onCompleted.run();
- }
-
- @Override
- public boolean isReady() {
- return isReady.get();
- }
-
- @Override
- public void setOnReadyHandler(Runnable onReadyHandler) {}
-
- @Override
- public void disableAutoInboundFlowControl() {}
-
- @Override
- public void request(int count) {}
-
- @Override
- public void setMessageCompression(boolean enable) {}
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
deleted file mode 100644
index b684c90..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.test;
-
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link TestStreams}. */
-@RunWith(JUnit4.class)
-public class TestStreamsTest {
- @Test
- public void testOnNextIsCalled() {
- AtomicBoolean onNextWasCalled = new AtomicBoolean();
- TestStreams.withOnNext(onNextWasCalled::set).build().onNext(true);
- assertTrue(onNextWasCalled.get());
- }
-
- @Test
- public void testIsReadyIsCalled() {
- final AtomicBoolean isReadyWasCalled = new AtomicBoolean();
- assertFalse(TestStreams.withOnNext(null)
- .withIsReady(() -> isReadyWasCalled.getAndSet(true))
- .build()
- .isReady());
- assertTrue(isReadyWasCalled.get());
- }
-
- @Test
- public void testOnCompletedIsCalled() {
- AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
- TestStreams.withOnNext(null)
- .withOnCompleted(() -> onCompletedWasCalled.set(true))
- .build()
- .onCompleted();
- assertTrue(onCompletedWasCalled.get());
- }
-
- @Test
- public void testOnErrorRunnableIsCalled() {
- RuntimeException throwable = new RuntimeException();
- AtomicBoolean onErrorWasCalled = new AtomicBoolean();
- TestStreams.withOnNext(null)
- .withOnError(() -> onErrorWasCalled.set(true))
- .build()
- .onError(throwable);
- assertTrue(onErrorWasCalled.get());
- }
-
- @Test
- public void testOnErrorConsumerIsCalled() {
- RuntimeException throwable = new RuntimeException();
- Collection<Throwable> onErrorWasCalled = new ArrayList<>();
- TestStreams.withOnNext(null)
- .withOnError(onErrorWasCalled::add)
- .build()
- .onError(throwable);
- assertThat(onErrorWasCalled, contains(throwable));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index e5af784..62e4ec3 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -53,6 +53,7 @@
<jdk>[1.8,)</jdk>
</activation>
<modules>
+ <module>fn-execution</module>
<module>harness</module>
<module>container</module>
<module>java8tests</module>