You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/04 17:28:00 UTC
[1/3] incubator-beam git commit: Fix direct runner pom & deps
Repository: incubator-beam
Updated Branches:
refs/heads/master 1ef53b17d -> 32970c985
Fix direct runner pom & deps
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78798ada
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78798ada
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78798ada
Branch: refs/heads/master
Commit: 78798ada581a6dfba3ac459a8fadda436f75721f
Parents: 892ead2
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 3 20:09:16 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 4 09:44:05 2016 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 16 ----------------
1 file changed, 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78798ada/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 5c7dc38..79cf517 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -243,27 +243,11 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>java-sdk-all</artifactId>
- <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>runners-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.http-client</groupId>
- <artifactId>google-http-client</artifactId>
- <version>${google-clients.version}</version>
- <exclusions>
- <!-- Exclude an old version of guava that is being pulled
- in by a transitive dependency of google-api-client -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-jdk5</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
[3/3] incubator-beam git commit: This closes #281
Posted by ke...@apache.org.
This closes #281
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/32970c98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/32970c98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/32970c98
Branch: refs/heads/master
Commit: 32970c985f5b557101533ba2bb2d3d6352e284fa
Parents: 1ef53b1 78798ad
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 4 10:27:27 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 4 10:27:27 2016 -0700
----------------------------------------------------------------------
pom.xml | 6 +
runners/core-java/pom.xml | 208 +++++++++++++++++++
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 112 ++++++++++
runners/core-java/src/test/java/.placeholder | 0
runners/direct-java/pom.xml | 5 +
.../direct/GroupByKeyEvaluatorFactory.java | 2 +-
runners/flink/runner/pom.xml | 11 +
.../FlinkGroupAlsoByWindowWrapper.java | 8 +-
runners/pom.xml | 1 +
.../util/GroupAlsoByWindowViaWindowSetDoFn.java | 105 ----------
10 files changed, 348 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Create runners/core module for
artifact org.apache.beam:runners-core
Posted by ke...@apache.org.
Create runners/core module for artifact org.apache.beam:runners-core
This is strictly creating the module and moving one easy class to it.
Many of the utilities in org.apache.beam.util and subpackages should
move as developments allow.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/892ead2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/892ead2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/892ead2c
Branch: refs/heads/master
Commit: 892ead2c3b8bf7a53ee2d7570ba587453c186009
Parents: 6819dff
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 3 13:22:59 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 4 09:44:05 2016 -0700
----------------------------------------------------------------------
pom.xml | 6 +
runners/core-java/pom.xml | 208 +++++++++++++++++++
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 112 ++++++++++
runners/core-java/src/test/java/.placeholder | 0
runners/direct-java/pom.xml | 21 ++
.../direct/GroupByKeyEvaluatorFactory.java | 2 +-
runners/flink/runner/pom.xml | 11 +
.../FlinkGroupAlsoByWindowWrapper.java | 8 +-
runners/pom.xml | 1 +
.../util/GroupAlsoByWindowViaWindowSetDoFn.java | 105 ----------
10 files changed, 364 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b0b258b..27787da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,12 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>runners-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>google-cloud-dataflow-java-runner</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
new file mode 100644
index 0000000..b6f6f29
--- /dev/null
+++ b/runners/core-java/pom.xml
@@ -0,0 +1,208 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>runners-parent</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>runners-core</artifactId>
+ <name>Apache Beam :: Runners :: Core</name>
+ <description>Beam Runners Core provides utilities to aid runner authors.</description>
+
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals><goal>analyze-only</goal></goals>
+ <configuration>
+ <failOnWarning>true</failOnWarning>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+
+ <!-- Source plugin for generating source and test-source JARs. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <windowtitle>Beam Runners Core utilities ${project.version} API</windowtitle>
+ <doctitle>Beam Runners Core utilities for Java, version ${project.version}</doctitle>
+ <overview>../javadoc/overview.html</overview>
+
+ <subpackages>org.apache.beam.runners.core</subpackages>
+ <use>false</use>
+ <quiet>true</quiet>
+ <bottom><![CDATA[<br>]]></bottom>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.1</version>
+ <executions>
+ <!-- In the first phase, we pick dependencies and relocate them. -->
+ <execution>
+ <id>bundle-and-repackage</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadeTestJar>true</shadeTestJar>
+ <artifactSet>
+ <includes>
+ <include>com.google.guava:guava</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <!-- TODO: Once ready, change the following pattern to 'com'
+ only, exclude 'org.apache.beam.**', and remove
+ the second relocation. -->
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.beam.sdk.repackaged.com.google.common</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.thirdparty</pattern>
+ <shadedPattern>org.apache.beam.sdk.repackaged.com.google.thirdparty</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+
+ <!-- In the second phase, we pick remaining dependencies and bundle
+ them without repackaging. -->
+ <execution>
+ <id>bundle-rest-without-repackaging</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadeTestJar>true</shadeTestJar>
+ <finalName>${project.artifactId}-bundled-${project.version}</finalName>
+ <artifactSet>
+ <excludes>
+ <exclude>com.google.guava:guava</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Coverage analysis for unit tests. -->
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>java-sdk-all</artifactId>
+ </dependency>
+
+ <!-- build dependencies -->
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
new file mode 100644
index 0000000..73244f7
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.ReduceFnRunner;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.SystemReduceFn;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
+ * {@link ReduceFnRunner}.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowViaWindowSetDoFn<
+ K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
+ extends DoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT, OutputT, W> {
+
+ public static <K, InputT, OutputT, W extends BoundedWindow>
+ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
+ WindowingStrategy<?, W> strategy, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
+ return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, reduceFn);
+ }
+
+ protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+ createAggregator(
+ GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+ protected final Aggregator<Long, Long> droppedDueToLateness =
+ createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+
+ private final WindowingStrategy<Object, W> windowingStrategy;
+ private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+
+ private GroupAlsoByWindowViaWindowSetDoFn(
+ WindowingStrategy<?, W> windowingStrategy,
+ SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy;
+ this.windowingStrategy = noWildcard;
+ this.reduceFn = reduceFn;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ KeyedWorkItem<K, InputT> element = c.element();
+
+ K key = c.element().key();
+ TimerInternals timerInternals = c.windowingInternals().timerInternals();
+
+ // It is the responsibility of the user of GroupAlsoByWindowsViaWindowSet to only
+ // provide a WindowingInternals instance with the appropriate key type for StateInternals.
+ @SuppressWarnings("unchecked")
+ StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals();
+
+ ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
+ new ReduceFnRunner<>(
+ key,
+ windowingStrategy,
+ stateInternals,
+ timerInternals,
+ c.windowingInternals(),
+ droppedDueToClosedWindow,
+ reduceFn,
+ c.getPipelineOptions());
+
+ reduceFnRunner.processElements(element.elementsIterable());
+ for (TimerData timer : element.timersIterable()) {
+ reduceFnRunner.onTimer(timer);
+ }
+ reduceFnRunner.persist();
+ }
+
+ @Override
+ public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
+ // Safe contravariant cast
+ @SuppressWarnings("unchecked")
+ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn =
+ (DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
+ return asFn;
+ }
+
+ @Override
+ public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
+ return droppedDueToLateness;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/core-java/src/test/java/.placeholder
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/.placeholder b/runners/core-java/src/test/java/.placeholder
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 292cc56..5c7dc38 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -243,6 +243,27 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>java-sdk-all</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>runners-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ <version>${google-clients.version}</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
index 874ec17..9a08996 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
import org.apache.beam.runners.direct.StepTransformResult.Builder;
@@ -32,7 +33,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItemCoder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index ab8d266..a1d5370 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -101,6 +101,17 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>runners-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>google-cloud-dataflow-java-runner</artifactId>
<exclusions>
<exclusion>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 8d9744f..0306aa1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
@@ -37,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.SystemReduceFn;
@@ -83,7 +83,7 @@ import java.util.Set;
* This class is the key class implementing all the windowing/triggering logic of Apache Beam.
* To provide full compatibility and support for all the windowing/triggering combinations offered by
* Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in
- * ({@link org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn}.
+ * ({@link org.apache.beam.runners.core.GroupAlsoByWindowsDoFn}.
* <p/>
* In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already
* grouped by key</b>. Each of the elements that enter here, registers a timer
@@ -95,7 +95,7 @@ import java.util.Set;
* When a watermark arrives, all the registered timers are checked to see which ones are ready to
* fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are deregistered from
* the {@link FlinkGroupAlsoByWindowWrapper#activeTimers}
- * list, and are fed into the {@link org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn}
+ * list, and are fed into the {@link org.apache.beam.runners.core.GroupAlsoByWindowsDoFn}
* for furhter processing.
*/
public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
@@ -253,7 +253,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
}
/**
- * Create the adequate {@link org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn},
+ * Create the adequate {@link org.apache.beam.runners.core.GroupAlsoByWindowsDoFn},
* <b> if not already created</b>.
* If a {@link org.apache.beam.sdk.transforms.Combine.KeyedCombineFn} was provided, then
* a function with that combiner is created, so that elements are combined as they arrive. This is
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 74812e8..d2d68df 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
<name>Apache Beam :: Runners</name>
<modules>
+ <module>core-java</module>
<module>direct-java</module>
<module>flink</module>
<module>spark</module>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
deleted file mode 100644
index c24f6da..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
- * {@link ReduceFnRunner}.
- */
-@SystemDoFnInternal
-public class GroupAlsoByWindowViaWindowSetDoFn<
- K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
- extends DoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT, OutputT, W> {
-
- public static <K, InputT, OutputT, W extends BoundedWindow>
- DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
- WindowingStrategy<?, W> strategy, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
- return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, reduceFn);
- }
-
- protected final Aggregator<Long, Long> droppedDueToClosedWindow =
- createAggregator(
- GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
- protected final Aggregator<Long, Long> droppedDueToLateness =
- createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
-
- private final WindowingStrategy<Object, W> windowingStrategy;
- private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
-
- private GroupAlsoByWindowViaWindowSetDoFn(
- WindowingStrategy<?, W> windowingStrategy,
- SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
- @SuppressWarnings("unchecked")
- WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy;
- this.windowingStrategy = noWildcard;
- this.reduceFn = reduceFn;
- }
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- KeyedWorkItem<K, InputT> element = c.element();
-
- K key = c.element().key();
- TimerInternals timerInternals = c.windowingInternals().timerInternals();
-
- // It is the responsibility of the user of GroupAlsoByWindowsViaWindowSet to only
- // provide a WindowingInternals instance with the appropriate key type for StateInternals.
- @SuppressWarnings("unchecked")
- StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals();
-
- ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
- new ReduceFnRunner<>(
- key,
- windowingStrategy,
- stateInternals,
- timerInternals,
- c.windowingInternals(),
- droppedDueToClosedWindow,
- reduceFn,
- c.getPipelineOptions());
-
- reduceFnRunner.processElements(element.elementsIterable());
- for (TimerData timer : element.timersIterable()) {
- reduceFnRunner.onTimer(timer);
- }
- reduceFnRunner.persist();
- }
-
- @Override
- public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
- // Safe contravariant cast
- @SuppressWarnings("unchecked")
- DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn =
- (DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
- return asFn;
- }
-
- @Override
- public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
- return droppedDueToLateness;
- }
-}