You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/30 12:49:27 UTC
[1/3] incubator-beam git commit: Clean up Flink Runner POMs
Repository: incubator-beam
Updated Branches:
refs/heads/master 36a27f538 -> 1cd64bb1a
Clean up Flink Runner POMs
This moves the Flink Kafka connector dependency from the runner pom to
the examples pom.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c24d5ff5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c24d5ff5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c24d5ff5
Branch: refs/heads/master
Commit: c24d5ff52ba1720244c49aec8d9c29681d17121f
Parents: 4fe7010
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun May 29 08:29:50 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon May 30 14:23:19 2016 +0200
----------------------------------------------------------------------
runners/flink/examples/pom.xml | 6 ++++++
runners/flink/runner/pom.xml | 9 +++------
2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c24d5ff5/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index b000ab1..47e0167 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -75,6 +75,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c24d5ff5/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index b60cba1..82104fd 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -51,12 +51,6 @@
<artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
- <!-- Libraries not part of Flink which need to be included by the user (see flink-examples) -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
- <version>${flink.version}</version>
- </dependency>
<!-- Beam -->
<dependency>
@@ -124,6 +118,7 @@
</exclusions>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
@@ -135,7 +130,9 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
<version>${flink.version}</version>
+ <scope>test</scope>
</dependency>
+
<!-- Optional Pipeline Registration -->
<dependency>
<groupId>com.google.auto.service</groupId>
[2/3] incubator-beam git commit: Remove Some Code leftover from
Earlier Refactoring
Posted by al...@apache.org.
Remove Some Code leftover from Earlier Refactoring
This only removes unused parts of code.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4fe7010b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4fe7010b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4fe7010b
Branch: refs/heads/master
Commit: 4fe7010bdb7692914fc6c2821c95caea0cab770d
Parents: 36a27f5
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun May 29 08:04:39 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon May 30 14:23:19 2016 +0200
----------------------------------------------------------------------
runners/flink/runner/pom.xml | 5 -
.../FlinkBatchTransformTranslators.java | 131 +------------------
.../translation/wrappers/SinkOutputFormat.java | 97 --------------
3 files changed, 3 insertions(+), 230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fe7010b/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 757ac9c..b60cba1 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -57,11 +57,6 @@
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro_2.10</artifactId>
- <version>${flink.version}</version>
- </dependency>
<!-- Beam -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fe7010b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 8358807..200e4af 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -28,7 +28,6 @@ import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFun
import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
-import org.apache.beam.runners.flink.translation.wrappers.SinkOutputFormat;
import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
@@ -36,11 +35,8 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
@@ -69,27 +65,19 @@ import com.google.common.collect.Maps;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupCombineOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -100,7 +88,7 @@ import java.util.Map;
* Translators for transforming {@link PTransform PTransforms} to
* Flink {@link DataSet DataSets}.
*/
-public class FlinkBatchTransformTranslators {
+class FlinkBatchTransformTranslators {
// --------------------------------------------------------------------------------------------
// Transform Translator Registry
@@ -128,7 +116,7 @@ public class FlinkBatchTransformTranslators {
}
- public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
+ static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
PTransform<?, ?> transform) {
return TRANSLATORS.get(transform.getClass());
}
@@ -154,119 +142,6 @@ public class FlinkBatchTransformTranslators {
}
}
- private static class WriteSinkTranslatorBatch<T>
- implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> {
-
- @Override
- public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) {
- String name = transform.getName();
- PValue input = context.getInput(transform);
- DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
-
- inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions()))
- .name(name);
- }
- }
-
- private static class AvroIOWriteTranslatorBatch<T> implements
- FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class);
-
-
- @Override
- public void translateNode(
- AvroIO.Write.Bound<T> transform,
- FlinkBatchTranslationContext context) {
- DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(context.getInput(transform));
-
- String filenamePrefix = transform.getFilenamePrefix();
- String filenameSuffix = transform.getFilenameSuffix();
- int numShards = transform.getNumShards();
- String shardNameTemplate = transform.getShardNameTemplate();
-
- // TODO: Implement these. We need Flink support for this.
- LOG.warn(
- "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
- filenameSuffix);
- LOG.warn(
- "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
- shardNameTemplate);
-
- // This is super hacky, but unfortunately we cannot get the type otherwise
- Class<T> extractedAvroType;
- try {
- Field typeField = transform.getClass().getDeclaredField("type");
- typeField.setAccessible(true);
- @SuppressWarnings("unchecked")
- Class<T> avroType = (Class<T>) typeField.get(transform);
- extractedAvroType = avroType;
- } catch (NoSuchFieldException | IllegalAccessException e) {
- // we know that the field is there and it is accessible
- throw new RuntimeException("Could not access type from AvroIO.Bound", e);
- }
-
- MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map(
- new MapFunction<WindowedValue<T>, T>() {
- @Override
- public T map(WindowedValue<T> value) throws Exception {
- return value.getValue();
- }
- }).returns(new CoderTypeInformation<>(context.getInput(transform).getCoder()));
-
-
- DataSink<T> dataSink = valueStream.output(
- new AvroOutputFormat<>(new Path(filenamePrefix), extractedAvroType));
-
- if (numShards > 0) {
- dataSink.setParallelism(numShards);
- }
- }
- }
-
- private static class TextIOWriteTranslatorBatch<T>
- implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class);
-
- @Override
- public void translateNode(
- TextIO.Write.Bound<T> transform,
- FlinkBatchTranslationContext context) {
- PValue input = context.getInput(transform);
- DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
-
- String filenamePrefix = transform.getFilenamePrefix();
- String filenameSuffix = transform.getFilenameSuffix();
- boolean needsValidation = transform.needsValidation();
- int numShards = transform.getNumShards();
- String shardNameTemplate = transform.getShardNameTemplate();
-
- // TODO: Implement these. We need Flink support for this.
- LOG.warn(
- "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
- needsValidation);
- LOG.warn(
- "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
- filenameSuffix);
- LOG.warn(
- "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
- shardNameTemplate);
-
- MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map(
- new MapFunction<WindowedValue<T>, T>() {
- @Override
- public T map(WindowedValue<T> value) throws Exception {
- return value.getValue();
- }
- }).returns(new CoderTypeInformation<>(transform.getCoder()));
-
- DataSink<T> dataSink = valueStream.writeAsText(filenamePrefix);
-
- if (numShards > 0) {
- dataSink.setParallelism(numShards);
- }
- }
- }
-
private static class WindowBoundTranslatorBatch<T>
implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Bound<T>> {
@@ -431,7 +306,7 @@ public class FlinkBatchTransformTranslators {
private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
@Override
public List<T> createAccumulator() {
- return new ArrayList<T>();
+ return new ArrayList<>();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fe7010b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
deleted file mode 100644
index c0a7132..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.io.Sink;
-import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.WindowedValue;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.AbstractID;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-
-/**
- * Wrapper for executing a {@link Sink} on Flink as an {@link OutputFormat}.
- *
- * @param <T> The type of the incoming records.
- */
-public class SinkOutputFormat<T> implements OutputFormat<WindowedValue<T>> {
-
- private final Sink<T> sink;
-
- private final SerializedPipelineOptions serializedOptions;
-
- private Sink.WriteOperation<T, ?> writeOperation;
- private Sink.Writer<T, ?> writer;
-
- private AbstractID uid = new AbstractID();
-
- public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) {
- this.sink = transform.getSink();
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
- }
-
- @Override
- public void configure(Configuration configuration) {
- writeOperation = sink.createWriteOperation(serializedOptions.getPipelineOptions());
- try {
- writeOperation.initialize(serializedOptions.getPipelineOptions());
- } catch (Exception e) {
- throw new RuntimeException("Failed to initialize the write operation.", e);
- }
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- try {
- writer = writeOperation.createWriter(serializedOptions.getPipelineOptions());
- } catch (Exception e) {
- throw new IOException("Couldn't create writer.", e);
- }
- try {
- writer.open(uid + "-" + String.valueOf(taskNumber));
- } catch (Exception e) {
- throw new IOException("Couldn't open writer.", e);
- }
- }
-
- @Override
- public void writeRecord(WindowedValue<T> record) throws IOException {
- try {
- writer.write(record.getValue());
- } catch (Exception e) {
- throw new IOException("Couldn't write record.", e);
- }
- }
-
- @Override
- public void close() throws IOException {
- try {
- writer.close();
- } catch (Exception e) {
- throw new IOException("Couldn't close writer.", e);
- }
- }
-
-}
[3/3] incubator-beam git commit: This closes #396 and #397
Posted by al...@apache.org.
This closes #396 and #397
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1cd64bb1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1cd64bb1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1cd64bb1
Branch: refs/heads/master
Commit: 1cd64bb1a7c24db8e7a8e5ecdd2971a4611e3620
Parents: 36a27f5 c24d5ff
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon May 30 14:23:45 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon May 30 14:23:45 2016 +0200
----------------------------------------------------------------------
runners/flink/examples/pom.xml | 6 +
runners/flink/runner/pom.xml | 14 +-
.../FlinkBatchTransformTranslators.java | 131 +------------------
.../translation/wrappers/SinkOutputFormat.java | 97 --------------
4 files changed, 12 insertions(+), 236 deletions(-)
----------------------------------------------------------------------