You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/11/13 10:04:59 UTC
incubator-flink git commit: [FLINK-1226] Add API support for passing
Configurations to InputFormats
Repository: incubator-flink
Updated Branches:
refs/heads/master 21b1b975c -> b3c290f68
[FLINK-1226] Add API support for passing Configurations to InputFormats
This closes #196
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b3c290f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b3c290f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b3c290f6
Branch: refs/heads/master
Commit: b3c290f6860ebbea0f2931ff8398e6877fc49943
Parents: 21b1b97
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 11 15:02:16 2014 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Nov 13 10:04:29 2014 +0100
----------------------------------------------------------------------
.../flink/api/java/operators/DataSink.java | 25 +++-
.../flink/api/java/operators/DataSource.java | 22 +++-
.../test/javaApiOperators/DataSourceITCase.java | 131 +++++++++++++++++++
3 files changed, 176 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3c290f6/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 2b5aa31..ee0129c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Nothing;
import org.apache.flink.api.java.DataSet;
@@ -39,7 +40,9 @@ public class DataSink<T> {
private String name;
private int dop = -1;
-
+
+ private Configuration parameters;
+
public DataSink(DataSet<T> data, OutputFormat<T> format, TypeInformation<T> type) {
if (format == null) {
throw new IllegalArgumentException("The output format must not be null.");
@@ -69,6 +72,22 @@ public class DataSink<T> {
public DataSet<T> getDataSet() {
return data;
}
+
+ /**
+ * Pass a configuration to the OutputFormat
+ * @param parameters Configuration parameters
+ */
+ public DataSink<T> withParameters(Configuration parameters) {
+ this.parameters = parameters;
+ return this;
+ }
+
+ /**
+ * @return Configuration for the OutputFormat.
+ */
+ public Configuration getParameters() {
+ return this.parameters;
+ }
// --------------------------------------------------------------------------------------------
@@ -85,6 +104,10 @@ public class DataSink<T> {
GenericDataSinkBase<T> sink = new GenericDataSinkBase<T>(this.format, new UnaryOperatorInformation<T, Nothing>(this.type, new NothingTypeInfo()), name);
// set input
sink.setInput(input);
+ // set parameters
+ if(this.parameters != null) {
+ sink.getParameters().addAll(this.parameters);
+ }
// set dop
if(this.dop > 0) {
// use specified dop
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3c290f6/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index 739620b..764803f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
/**
* An operation that creates a new data set (data source). The operation acts as the
@@ -35,6 +36,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
private final InputFormat<OUT, ?> inputFormat;
+ private Configuration parameters;
// --------------------------------------------------------------------------------------------
@@ -68,6 +70,22 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
return this.inputFormat;
}
+ /**
+ * Pass a configuration to the InputFormat
+ * @param parameters Configuration parameters
+ */
+ public DataSource<OUT> withParameters(Configuration parameters) {
+ this.parameters = parameters;
+ return this;
+ }
+
+ /**
+ * @return Configuration for the InputFormat.
+ */
+ public Configuration getParameters() {
+ return this.parameters;
+ }
+
// --------------------------------------------------------------------------------------------
protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
@@ -80,7 +98,9 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
GenericDataSourceBase<OUT, ?> source = new GenericDataSourceBase(this.inputFormat,
new OperatorInformation<OUT>(getType()), name);
source.setDegreeOfParallelism(dop);
-
+ if(this.parameters != null) {
+ source.getParameters().addAll(this.parameters);
+ }
return source;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3c290f6/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
new file mode 100644
index 0000000..d645bc6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for the DataSource
+ */
+
+@RunWith(Parameterized.class)
+public class DataSourceITCase extends JavaProgramTestBase {
+
+ private static int NUM_PROGRAMS = 1;
+
+ private int curProgId = config.getInteger("ProgramId", -1);
+ private String resultPath;
+ private String inputPath;
+ private String expectedResult;
+
+ public DataSourceITCase(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ inputPath = createTempFile("input", "ab\n"
+ + "cd\n"
+ + "ef\n");
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ expectedResult = DataSourceProgs.runProgram(curProgId, inputPath, resultPath);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Parameters
+ public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+ LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+ for(int i=1; i <= NUM_PROGRAMS; i++) {
+ Configuration config = new Configuration();
+ config.setInteger("ProgramId", i);
+ tConfigs.add(config);
+ }
+
+ return toParameterList(tConfigs);
+ }
+
+ private static class TestInputFormat extends TextInputFormat {
+ private static final long serialVersionUID = 1L;
+
+ public TestInputFormat(Path filePath) {
+ super(filePath);
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ super.configure(parameters);
+
+ Assert.assertNotNull(parameters.getString("prepend", null));
+ Assert.assertEquals("test", parameters.getString("prepend", null));
+ }
+
+ }
+ private static class DataSourceProgs {
+
+ public static String runProgram(int progId, String inputPath, String resultPath) throws Exception {
+
+ switch(progId) {
+ case 1: {
+ /*
+ * Test passing a configuration object to an input format
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Configuration ifConf = new Configuration();
+ ifConf.setString("prepend", "test");
+
+ DataSet<String> ds = env.createInput(new TestInputFormat(new Path(inputPath))).withParameters(ifConf);
+ ds.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "ab\n"
+ + "cd\n"
+ + "ef\n";
+ }
+ default:
+ throw new IllegalArgumentException("Invalid program id");
+ }
+ }
+ }
+}