You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/11/18 01:23:51 UTC
[4/4] beam git commit: Add a Local Java Core Module
Add a Local Java Core Module
This contains utilities shared by runners which execute locally. This
is expected to be the DirectRunner and the ReferenceRunner, both of
which can utilize shared in-memory representations about the state of an
executing Pipeline.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4babcbf4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4babcbf4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4babcbf4
Branch: refs/heads/master
Commit: 4babcbf46e0a4bada90a77f908eeaa7870138ea9
Parents: 50e58f0
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 25 16:31:09 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Nov 17 17:23:41 2017 -0800
----------------------------------------------------------------------
pom.xml | 6 ++
runners/direct-java/pom.xml | 5 ++
.../beam/runners/direct/CommittedBundle.java | 9 ++-
.../direct/ImmutableListBundleFactory.java | 8 +++
.../beam/runners/direct/WatermarkManager.java | 4 +-
.../direct/ImmutableListBundleFactoryTest.java | 4 +-
runners/local-java/pom.xml | 62 ++++++++++++++++++++
.../apache/beam/runners/core/local/Bundle.java | 33 +++++++++++
.../beam/runners/core/local/package-info.java | 22 +++++++
runners/pom.xml | 1 +
10 files changed, 145 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7bf9d66..c5223df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -702,6 +702,12 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-local-java-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 752af44..ff7b4a8 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -182,6 +182,11 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-local-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
index 79a96fe..ccb031f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.local.Bundle;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
@@ -33,7 +34,7 @@ import org.joda.time.Instant;
* a part of at a later point.
* @param <T> the type of elements contained within this bundle
*/
-interface CommittedBundle<T> {
+interface CommittedBundle<T> extends Bundle<T> {
/**
* Returns the PCollection that the elements of this bundle belong to.
*/
@@ -52,10 +53,8 @@ interface CommittedBundle<T> {
*/
Iterable<WindowedValue<T>> getElements();
- /**
- * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}.
- */
- Instant getMinTimestamp();
+ @Override
+ Instant getMinimumTimestamp();
/**
* Returns the processing time output watermark at the time the producing {@link PTransform}
http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index 73734d0..bea7699 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
+import java.util.Iterator;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
@@ -128,6 +130,12 @@ class ImmutableListBundleFactory implements BundleFactory {
}
@Override
+ @Nonnull
+ public Iterator<WindowedValue<T>> iterator() {
+ return getElements().iterator();
+ }
+
+ @Override
public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements) {
return create(
getPCollection(),
http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 599b74f..a1395a9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -274,7 +274,7 @@ class WatermarkManager {
if (!pendingElements.isEmpty()) {
minInputWatermark =
INSTANT_ORDERING.min(
- minInputWatermark, pendingElements.firstEntry().getElement().getMinTimestamp());
+ minInputWatermark, pendingElements.firstEntry().getElement().getMinimumTimestamp());
}
Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
currentWatermark.set(newWatermark);
@@ -1511,7 +1511,7 @@ class WatermarkManager {
@Override
public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
return ComparisonChain.start()
- .compare(o1.getMinTimestamp(), o2.getMinTimestamp())
+ .compare(o1.getMinimumTimestamp(), o2.getMinimumTimestamp())
.result();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index 4a392db..83426c6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -120,7 +120,7 @@ public class ImmutableListBundleFactoryTest {
// Sanity check that the test is meaningful.
assertThat(minElementTs, not(equalTo(commitTime)));
- assertThat(committed.getMinTimestamp(), equalTo(minElementTs));
+ assertThat(committed.getMinimumTimestamp(), equalTo(minElementTs));
assertThat(committed.getSynchronizedProcessingOutputWatermark(), equalTo(commitTime));
return committed;
@@ -190,7 +190,7 @@ public class ImmutableListBundleFactoryTest {
assertThat(
withed.getSynchronizedProcessingOutputWatermark(),
equalTo(committed.getSynchronizedProcessingOutputWatermark()));
- assertThat(withed.getMinTimestamp(), equalTo(new Instant(2048L)));
+ assertThat(withed.getMinimumTimestamp(), equalTo(new Instant(2048L)));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/local-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/local-java/pom.xml b/runners/local-java/pom.xml
new file mode 100644
index 0000000..b988fd9
--- /dev/null
+++ b/runners/local-java/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-parent</artifactId>
+ <version>2.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-runners-local-java-core</artifactId>
+
+ <name>Apache Beam :: Runners :: Local Java Core</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <!-- The Local Java Core artifact should carefully manage the classes made available by this
+ dependency. The Java SDK should be used to provide common utilities (e.g. Coder, WindowedValue)
+ but should not be used within this library to execute any UDFs.
+ TODO: Add an APISurfaceTest to force this to be the case, if possible. -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <!-- build dependencies -->
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java
----------------------------------------------------------------------
diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java
new file mode 100644
index 0000000..98e1e8a
--- /dev/null
+++ b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/Bundle.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.local;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/** An immutable collection of elements which are part of a {@code PCollection}. */
+public interface Bundle<T> extends Iterable<WindowedValue<T>> {
+ /**
+ * Return the minimum timestamp among elements in this bundle.
+ *
+ * <p>This should be equivalent to iterating over all of the elements within a bundle and
+ * selecting the minimum timestamp from among them.
+ */
+ Instant getMinimumTimestamp();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java
----------------------------------------------------------------------
diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java
new file mode 100644
index 0000000..ffd2e54
--- /dev/null
+++ b/runners/local-java/src/main/java/org/apache/beam/runners/core/local/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities useful when executing a pipeline on a single machine.
+ */
+package org.apache.beam.runners.core.local;
http://git-wip-us.apache.org/repos/asf/beam/blob/4babcbf4/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 47f3c0e..814b3f1 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -36,6 +36,7 @@
<module>core-construction-java</module>
<module>core-java</module>
<module>java-fn-execution</module>
+ <module>local-java</module>
<module>local-artifact-service-java</module>
<module>reference</module>
<module>direct-java</module>