You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/11/18 01:23:51 UTC

[4/4] beam git commit: Add a Local Java Core Module

Add a Local Java Core Module

This contains utilities shared by runners which execute locally. This
is expected to be the DirectRunner and the ReferenceRunner, both of
which can utilize shared in-memory representations about the state of an
executing Pipeline.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4babcbf4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4babcbf4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4babcbf4

Branch: refs/heads/master
Commit: 4babcbf46e0a4bada90a77f908eeaa7870138ea9
Parents: 50e58f0
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 25 16:31:09 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Nov 17 17:23:41 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |  6 ++
 runners/direct-java/pom.xml                     |  5 ++
 .../beam/runners/direct/CommittedBundle.java    |  9 ++-
 .../direct/ImmutableListBundleFactory.java      |  8 +++
 .../beam/runners/direct/WatermarkManager.java   |  4 +-
 .../direct/ImmutableListBundleFactoryTest.java  |  4 +-
 runners/local-java/pom.xml                      | 62 ++++++++++++++++++++
 .../apache/beam/runners/core/local/Bundle.java  | 33 +++++++++++
 .../beam/runners/core/local/package-info.java   | 22 +++++++
 runners/pom.xml                                 |  1 +
 10 files changed, 145 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7bf9d66..c5223df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -702,6 +702,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-runners-local-java-core</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-runners-direct-java</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 752af44..ff7b4a8 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -182,6 +182,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-local-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
index 79a96fe..ccb031f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
 
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.local.Bundle;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -33,7 +34,7 @@ import org.joda.time.Instant;
  * a part of at a later point.
  * @param <T> the type of elements contained within this bundle
  */
-interface CommittedBundle<T> {
+interface CommittedBundle<T> extends Bundle<T> {
   /**
    * Returns the PCollection that the elements of this bundle belong to.
    */
@@ -52,10 +53,8 @@ interface CommittedBundle<T> {
    */
   Iterable<WindowedValue<T>> getElements();
 
-  /**
-   * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}.
-   */
-  Instant getMinTimestamp();
+  @Override
+  Instant getMinimumTimestamp();
 
   /**
    * Returns the processing time output watermark at the time the producing {@link PTransform}

http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index 73734d0..bea7699 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
+import java.util.Iterator;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -128,6 +130,12 @@ class ImmutableListBundleFactory implements BundleFactory {
     }
 
     @Override
+    @Nonnull
+    public Iterator<WindowedValue<T>> iterator() {
+      return getElements().iterator();
+    }
+
+    @Override
     public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements) {
       return create(
           getPCollection(),

http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 599b74f..a1395a9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -274,7 +274,7 @@ class WatermarkManager {
       if (!pendingElements.isEmpty()) {
         minInputWatermark =
             INSTANT_ORDERING.min(
-                minInputWatermark, pendingElements.firstEntry().getElement().getMinTimestamp());
+                minInputWatermark, pendingElements.firstEntry().getElement().getMinimumTimestamp());
       }
       Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
       currentWatermark.set(newWatermark);
@@ -1511,7 +1511,7 @@ class WatermarkManager {
     @Override
     public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
       return ComparisonChain.start()
-          .compare(o1.getMinTimestamp(), o2.getMinTimestamp())
+          .compare(o1.getMinimumTimestamp(), o2.getMinimumTimestamp())
           .result();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index 4a392db..83426c6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -120,7 +120,7 @@ public class ImmutableListBundleFactoryTest {
 
     // Sanity check that the test is meaningful.
     assertThat(minElementTs, not(equalTo(commitTime)));
-    assertThat(committed.getMinTimestamp(), equalTo(minElementTs));
+    assertThat(committed.getMinimumTimestamp(), equalTo(minElementTs));
     assertThat(committed.getSynchronizedProcessingOutputWatermark(), equalTo(commitTime));
 
     return committed;
@@ -190,7 +190,7 @@ public class ImmutableListBundleFactoryTest {
     assertThat(
         withed.getSynchronizedProcessingOutputWatermark(),
         equalTo(committed.getSynchronizedProcessingOutputWatermark()));
-    assertThat(withed.getMinTimestamp(), equalTo(new Instant(2048L)));
+    assertThat(withed.getMinimumTimestamp(), equalTo(new Instant(2048L)));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/local-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/local-java/pom.xml b/runners/local-java/pom.xml
new file mode 100644
index 0000000..b988fd9
--- /dev/null
+++ b/runners/local-java/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-local-java-core</artifactId>
+
+  <name>Apache Beam :: Runners :: Local Java Core</name>
+
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <!-- The Local Java Core artifact should carefully manage the classes made available by this
+     dependency. The Java SDK should be used to provide common utilities (e.g. Coder, WindowedValue)
+     but should not be used within this library to execute any UDFs.
+     TODO: Add an APISurfaceTest to force this to be the case, if possible. -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <!-- build dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java
----------------------------------------------------------------------
diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java
new file mode 100644
index 0000000..98e1e8a
--- /dev/null
+++ b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.local;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/** An immutable collection of elements which are part of a {@code PCollection}. */
+public interface Bundle<T> extends Iterable<WindowedValue<T>> {
+  /**
+   * Return the minimum timestamp among elements in this bundle.
+   *
+   * <p>This should be equivalent to iterating over all of the elements within a bundle and
+   * selecting the minimum timestamp from among them.
+   */
+  Instant getMinimumTimestamp();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java
----------------------------------------------------------------------
diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java
new file mode 100644
index 0000000..ffd2e54
--- /dev/null
+++ b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities useful when executing a pipeline on a single machine.
+ */
+package org.apache.beam.runners.core.local;

http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 47f3c0e..814b3f1 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -36,6 +36,7 @@
     <module>core-construction-java</module>
     <module>core-java</module>
     <module>java-fn-execution</module>
+    <module>local-java</module>
     <module>local-artifact-service-java</module>
     <module>reference</module>
     <module>direct-java</module>