You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/07 16:54:39 UTC

[23/50] [abbrv] beam git commit: Add sdks/java/fn-execution

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
deleted file mode 100644
index 85c64d0..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.junit.runners.model.Statement;
-
-/** Tests for {@link TestExecutors}. */
-@RunWith(JUnit4.class)
-public class TestExecutorsTest {
-  @Test
-  public void testSuccessfulTermination() throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
-    final AtomicBoolean taskRan = new AtomicBoolean();
-    testService
-        .apply(
-            new Statement() {
-              @Override
-              public void evaluate() throws Throwable {
-                testService.submit(() -> taskRan.set(true));
-              }
-            },
-            null)
-        .evaluate();
-    assertTrue(service.isTerminated());
-    assertTrue(taskRan.get());
-  }
-
-  @Test
-  public void testTaskBlocksForeverCausesFailure() throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
-    final AtomicBoolean taskStarted = new AtomicBoolean();
-    final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
-    try {
-      testService
-          .apply(
-              new Statement() {
-                @Override
-                public void evaluate() throws Throwable {
-                  testService.submit(this::taskToRun);
-                }
-
-                private void taskToRun() {
-                  taskStarted.set(true);
-                  try {
-                    while (true) {
-                      Thread.sleep(10000);
-                    }
-                  } catch (InterruptedException e) {
-                    taskWasInterrupted.set(true);
-                    return;
-                  }
-                }
-              },
-              null)
-          .evaluate();
-      fail();
-    } catch (IllegalStateException e) {
-      assertEquals(IllegalStateException.class, e.getClass());
-      assertEquals("Test executor failed to shutdown cleanly.", e.getMessage());
-    }
-    assertTrue(service.isShutdown());
-  }
-
-  @Test
-  public void testStatementFailurePropagatedCleanly() throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
-    final RuntimeException exceptionToThrow = new RuntimeException();
-    try {
-      testService
-          .apply(
-              new Statement() {
-                @Override
-                public void evaluate() throws Throwable {
-                  throw exceptionToThrow;
-                }
-              },
-              null)
-          .evaluate();
-      fail();
-    } catch (RuntimeException thrownException) {
-      assertSame(exceptionToThrow, thrownException);
-    }
-    assertTrue(service.isShutdown());
-  }
-
-  @Test
-  public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate()
-      throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
-    final AtomicBoolean taskStarted = new AtomicBoolean();
-    final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
-    final RuntimeException exceptionToThrow = new RuntimeException();
-    try {
-      testService
-          .apply(
-              new Statement() {
-                @Override
-                public void evaluate() throws Throwable {
-                  testService.submit(this::taskToRun);
-                  throw exceptionToThrow;
-                }
-
-                private void taskToRun() {
-                  taskStarted.set(true);
-                  try {
-                    while (true) {
-                      Thread.sleep(10000);
-                    }
-                  } catch (InterruptedException e) {
-                    taskWasInterrupted.set(true);
-                    return;
-                  }
-                }
-              },
-              null)
-          .evaluate();
-      fail();
-    } catch (RuntimeException thrownException) {
-      assertSame(exceptionToThrow, thrownException);
-      assertEquals(1, exceptionToThrow.getSuppressed().length);
-      assertEquals(IllegalStateException.class, exceptionToThrow.getSuppressed()[0].getClass());
-      assertEquals(
-          "Test executor failed to shutdown cleanly.",
-          exceptionToThrow.getSuppressed()[0].getMessage());
-    }
-    assertTrue(service.isShutdown());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
deleted file mode 100644
index f398286..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.test;
-
-import io.grpc.stub.CallStreamObserver;
-import io.grpc.stub.StreamObserver;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-/** Utility methods which enable testing of {@link StreamObserver}s. */
-public class TestStreams {
-  /**
-   * Creates a test {@link CallStreamObserver}  {@link Builder} that forwards
-   * {@link StreamObserver#onNext} calls to the supplied {@link Consumer}.
-   */
-  public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
-    return new Builder<>(new ForwardingCallStreamObserver<>(
-        onNext,
-        TestStreams::noop,
-        TestStreams::noop,
-        TestStreams::returnTrue));
-  }
-
-  /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
-  public static class Builder<T> {
-    private final ForwardingCallStreamObserver<T> observer;
-    private Builder(ForwardingCallStreamObserver<T> observer) {
-      this.observer = observer;
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link CallStreamObserver#isReady} callback.
-     */
-    public Builder<T> withIsReady(Supplier<Boolean> isReady) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext,
-          observer.onError,
-          observer.onCompleted,
-          isReady));
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link StreamObserver#onCompleted} callback.
-     */
-    public Builder<T> withOnCompleted(Runnable onCompleted) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext,
-          observer.onError,
-          onCompleted,
-          observer.isReady));
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link StreamObserver#onError} callback.
-     */
-    public Builder<T> withOnError(Runnable onError) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext,
-          new Consumer<Throwable>() {
-            @Override
-            public void accept(Throwable t) {
-              onError.run();
-            }
-          },
-          observer.onCompleted,
-          observer.isReady));
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link StreamObserver#onError} consumer.
-     */
-    public Builder<T> withOnError(Consumer<Throwable> onError) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext, onError, observer.onCompleted, observer.isReady));
-    }
-
-    public CallStreamObserver<T> build() {
-      return observer;
-    }
-  }
-
-  private static void noop() {
-  }
-
-  private static void noop(Throwable t) {
-  }
-
-  private static boolean returnTrue() {
-    return true;
-  }
-
-  /** A {@link CallStreamObserver} which executes the supplied callbacks. */
-  private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> {
-    private final Consumer<T> onNext;
-    private final Supplier<Boolean> isReady;
-    private final Consumer<Throwable> onError;
-    private final Runnable onCompleted;
-
-    public ForwardingCallStreamObserver(
-        Consumer<T> onNext,
-        Consumer<Throwable> onError,
-        Runnable onCompleted,
-        Supplier<Boolean> isReady) {
-      this.onNext = onNext;
-      this.onError = onError;
-      this.onCompleted = onCompleted;
-      this.isReady = isReady;
-    }
-
-    @Override
-    public void onNext(T value) {
-      onNext.accept(value);
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      onError.accept(t);
-    }
-
-    @Override
-    public void onCompleted() {
-      onCompleted.run();
-    }
-
-    @Override
-    public boolean isReady() {
-      return isReady.get();
-    }
-
-    @Override
-    public void setOnReadyHandler(Runnable onReadyHandler) {}
-
-    @Override
-    public void disableAutoInboundFlowControl() {}
-
-    @Override
-    public void request(int count) {}
-
-    @Override
-    public void setMessageCompression(boolean enable) {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
deleted file mode 100644
index b684c90..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.test;
-
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link TestStreams}. */
-@RunWith(JUnit4.class)
-public class TestStreamsTest {
-  @Test
-  public void testOnNextIsCalled() {
-    AtomicBoolean onNextWasCalled = new AtomicBoolean();
-    TestStreams.withOnNext(onNextWasCalled::set).build().onNext(true);
-    assertTrue(onNextWasCalled.get());
-  }
-
-  @Test
-  public void testIsReadyIsCalled() {
-    final AtomicBoolean isReadyWasCalled = new AtomicBoolean();
-    assertFalse(TestStreams.withOnNext(null)
-        .withIsReady(() -> isReadyWasCalled.getAndSet(true))
-        .build()
-        .isReady());
-    assertTrue(isReadyWasCalled.get());
-  }
-
-  @Test
-  public void testOnCompletedIsCalled() {
-    AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
-    TestStreams.withOnNext(null)
-        .withOnCompleted(() -> onCompletedWasCalled.set(true))
-        .build()
-        .onCompleted();
-    assertTrue(onCompletedWasCalled.get());
-  }
-
-  @Test
-  public void testOnErrorRunnableIsCalled() {
-    RuntimeException throwable = new RuntimeException();
-    AtomicBoolean onErrorWasCalled = new AtomicBoolean();
-    TestStreams.withOnNext(null)
-        .withOnError(() -> onErrorWasCalled.set(true))
-        .build()
-        .onError(throwable);
-    assertTrue(onErrorWasCalled.get());
-  }
-
-  @Test
-  public void testOnErrorConsumerIsCalled() {
-    RuntimeException throwable = new RuntimeException();
-    Collection<Throwable> onErrorWasCalled = new ArrayList<>();
-    TestStreams.withOnNext(null)
-        .withOnError(onErrorWasCalled::add)
-        .build()
-        .onError(throwable);
-    assertThat(onErrorWasCalled, contains(throwable));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index e5af784..62e4ec3 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -53,6 +53,7 @@
         <jdk>[1.8,)</jdk>
       </activation>
       <modules>
+        <module>fn-execution</module>
         <module>harness</module>
         <module>container</module>
         <module>java8tests</module>