You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/11/13 10:04:59 UTC

incubator-flink git commit: [FLINK-1226] Add API support for passing Configurations to InputFormats

Repository: incubator-flink
Updated Branches:
  refs/heads/master 21b1b975c -> b3c290f68


[FLINK-1226] Add API support for passing Configurations to InputFormats

This closes #196


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b3c290f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b3c290f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b3c290f6

Branch: refs/heads/master
Commit: b3c290f6860ebbea0f2931ff8398e6877fc49943
Parents: 21b1b97
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 11 15:02:16 2014 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Nov 13 10:04:29 2014 +0100

----------------------------------------------------------------------
 .../flink/api/java/operators/DataSink.java      |  25 +++-
 .../flink/api/java/operators/DataSource.java    |  22 +++-
 .../test/javaApiOperators/DataSourceITCase.java | 131 +++++++++++++++++++
 3 files changed, 176 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3c290f6/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 2b5aa31..ee0129c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Nothing;
 import org.apache.flink.api.java.DataSet;
 
@@ -39,7 +40,9 @@ public class DataSink<T> {
 	private String name;
 	
 	private int dop = -1;
-	
+
+	private Configuration parameters;
+
 	public DataSink(DataSet<T> data, OutputFormat<T> format, TypeInformation<T> type) {
 		if (format == null) {
 			throw new IllegalArgumentException("The output format must not be null.");
@@ -69,6 +72,22 @@ public class DataSink<T> {
 	public DataSet<T> getDataSet() {
 		return data;
 	}
+
+	/**
+	 * Pass a configuration to the OutputFormat
+	 * @param parameters Configuration parameters
+	 */
+	public DataSink<T> withParameters(Configuration parameters) {
+		this.parameters = parameters;
+		return this;
+	}
+
+	/**
+	 * @return Configuration for the OutputFormat.
+	 */
+	public Configuration getParameters() {
+		return this.parameters;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -85,6 +104,10 @@ public class DataSink<T> {
 		GenericDataSinkBase<T> sink = new GenericDataSinkBase<T>(this.format, new UnaryOperatorInformation<T, Nothing>(this.type, new NothingTypeInfo()), name);
 		// set input
 		sink.setInput(input);
+		// set parameters
+		if(this.parameters != null) {
+			sink.getParameters().addAll(this.parameters);
+		}
 		// set dop
 		if(this.dop > 0) {
 			// use specified dop

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3c290f6/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index 739620b..764803f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
 
 /**
  * An operation that creates a new data set (data source). The operation acts as the
@@ -35,6 +36,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 	
 	private final InputFormat<OUT, ?> inputFormat;
+	private Configuration parameters;
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -68,6 +70,22 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 		return this.inputFormat;
 	}
 	
+	/**
+	 * Pass a configuration to the InputFormat
+	 * @param parameters Configuration parameters
+	 */
+	public DataSource<OUT> withParameters(Configuration parameters) {
+		this.parameters = parameters;
+		return this;
+	}
+	
+	/**
+	 * @return Configuration for the InputFormat.
+	 */
+	public Configuration getParameters() {
+		return this.parameters;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
@@ -80,7 +98,9 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 		GenericDataSourceBase<OUT, ?> source = new GenericDataSourceBase(this.inputFormat,
 				new OperatorInformation<OUT>(getType()), name);
 		source.setDegreeOfParallelism(dop);
-		
+		if(this.parameters != null) {
+			source.getParameters().addAll(this.parameters);
+		}
 		return source;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3c290f6/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
new file mode 100644
index 0000000..d645bc6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for the DataSource
+ */
+
+@RunWith(Parameterized.class)
+public class DataSourceITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 1;
+
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String inputPath;
+	private String expectedResult;
+
+	public DataSourceITCase(Configuration config) {
+		super(config);	
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		inputPath = createTempFile("input", "ab\n"
+				+ "cd\n"
+				+ "ef\n");
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = DataSourceProgs.runProgram(curProgId, inputPath, resultPath);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	private static class TestInputFormat extends TextInputFormat {
+		private static final long serialVersionUID = 1L;
+
+		public TestInputFormat(Path filePath) {
+			super(filePath);
+		}
+		
+		@Override
+		public void configure(Configuration parameters) {
+			super.configure(parameters);
+			
+			Assert.assertNotNull(parameters.getString("prepend", null));
+			Assert.assertEquals("test", parameters.getString("prepend", null));
+		}
+		
+	}
+	private static class DataSourceProgs {
+		
+		public static String runProgram(int progId, String inputPath, String resultPath) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/*
+				 * Test passing a configuration object to an input format
+				 */
+		
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				Configuration ifConf = new Configuration();
+				ifConf.setString("prepend", "test");
+				
+				DataSet<String> ds = env.createInput(new TestInputFormat(new Path(inputPath))).withParameters(ifConf);
+				ds.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "ab\n"
+					+ "cd\n"
+					+ "ef\n";
+			}
+			default: 
+				throw new IllegalArgumentException("Invalid program id");
+			}
+		}
+	}
+}