You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/10/15 07:47:32 UTC

[1/2] flink git commit: [FLINK-4506] [DataSet] Fix documentation of CsvOutputFormat about incorrect default of allowNullValues

Repository: flink
Updated Branches:
  refs/heads/release-1.1 9a9fb1066 -> 9f7269808


[FLINK-4506] [DataSet] Fix documentation of CsvOutputFormat about incorrect default of allowNullValues

- Add test case for CsvOutputFormat

This closes #2477
This closes #2631


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9591d50f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9591d50f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9591d50f

Branch: refs/heads/release-1.1
Commit: 9591d50fc6f3b2d7fab3af7aeee29e04d11b7811
Parents: 9a9fb10
Author: Kirill Morozov <ki...@epam.com>
Authored: Wed Sep 7 12:43:26 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Oct 15 08:02:25 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/CsvOutputFormat.java      |  2 +-
 .../flink/api/java/io/CsvOutputFormatTest.java  | 79 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9591d50f/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index dc20620..703128f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -122,7 +122,7 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
 	 * Configures the format to either allow null values (writing an empty field),
 	 * or to throw an exception when encountering a null field.
 	 * <p>
-	 * by default, null values are allowed.
+	 * by default, null values are disallowed.
 	 *
 	 * @param allowNulls Flag to indicate whether the output format should accept null values.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9591d50f/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
new file mode 100644
index 0000000..006f940
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class CsvOutputFormatTest {
+
+	private String path = null;
+
+	@Before
+	public void createFile() throws Exception {
+		path = File.createTempFile("csv_output_test_file",".csv").getAbsolutePath();
+	}
+
+	@Test
+	public void testNullAllow() throws Exception {
+
+		CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+		csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+		csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+		csvOutputFormat.setAllowNullValues(true);
+		csvOutputFormat.open(0, 1);
+		csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
+		csvOutputFormat.close();
+
+		java.nio.file.Path p = Paths.get(path);
+		Assert.assertTrue(Files.exists(p));
+		List<String> lines = Files.readAllLines(Paths.get(path), StandardCharsets.UTF_8);
+		Assert.assertEquals(1, lines.size());
+		Assert.assertEquals("One,,8", lines.get(0));
+	}
+
+	@Test(expected = RuntimeException.class)
+	public void testNullDisallowOnDefault() throws Exception {
+		CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+		csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+		csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+		csvOutputFormat.open(0, 1);
+		csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
+		csvOutputFormat.close();
+	}
+
+	@After
+	public void cleanUp() throws IOException {
+		Files.deleteIfExists(Paths.get(path));
+	}
+
+}


[2/2] flink git commit: [FLINK-4108] [scala] Respect ResultTypeQueryable for InputFormats.

Posted by fh...@apache.org.
[FLINK-4108] [scala] Respect ResultTypeQueryable for InputFormats.

This closes #2619


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f726980
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f726980
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f726980

Branch: refs/heads/release-1.1
Commit: 9f7269808f3694815bba1e4dbf050db2a2dfe15f
Parents: 9591d50
Author: twalthr <tw...@apache.org>
Authored: Tue Oct 11 11:19:32 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Oct 15 08:04:55 2016 +0200

----------------------------------------------------------------------
 .../flink/api/scala/ExecutionEnvironment.scala  |  6 +--
 .../org/apache/flink/api/scala/package.scala    | 11 +++-
 .../scala/typeutils/TypeExtractionTest.scala    | 53 ++++++++++++++++++++
 3 files changed, 66 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f726980/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index f03cb84..4f9d569 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -110,7 +110,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     */
   @PublicEvolving
   def getRestartStrategy: RestartStrategyConfiguration = {
-    javaEnv.getRestartStrategy()
+    javaEnv.getRestartStrategy
   }
 
   /**
@@ -381,7 +381,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     require(inputFormat != null, "InputFormat must not be null.")
     require(filePath != null, "File path must not be null.")
     inputFormat.setFilePath(new Path(filePath))
-    createInput(inputFormat, implicitly[TypeInformation[T]])
+    createInput(inputFormat, explicitFirst(inputFormat, implicitly[TypeInformation[T]]))
   }
 
   /**
@@ -392,7 +392,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     if (inputFormat == null) {
       throw new IllegalArgumentException("InputFormat must not be null.")
     }
-    createInput(inputFormat, implicitly[TypeInformation[T]])
+    createInput(inputFormat, explicitFirst(inputFormat, implicitly[TypeInformation[T]]))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f726980/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index e5ca465..6096388 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -21,8 +21,9 @@ package org.apache.flink.api
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, TypeUtils, ScalaNothingTypeInfo}
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, ScalaNothingTypeInfo, TypeUtils}
 
 import _root_.scala.reflect.ClassTag
 import language.experimental.macros
@@ -52,6 +53,14 @@ package object scala {
   // We need to wrap Java DataSet because we need the scala operations
   private[flink] def wrap[R: ClassTag](set: JavaDataSet[R]) = new DataSet[R](set)
 
+  // Checks if object has explicit type information using ResultTypeQueryable
+  private[flink] def explicitFirst[T](
+      funcOrInputFormat: AnyRef,
+      typeInfo: TypeInformation[T]): TypeInformation[T] = funcOrInputFormat match {
+    case rtq: ResultTypeQueryable[T] => rtq.getProducedType
+    case _ => typeInfo
+  }
+
   private[flink] def fieldNames2Indices(
       typeInfo: TypeInformation[_],
       fields: Array[String]): Array[Int] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f726980/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
new file mode 100644
index 0000000..0462ffa
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.io.FileInputFormat
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.TypeExtractionTest.CustomTypeInputFormat
+import org.apache.flink.util.TestLogger
+import org.junit.Assert.assertEquals
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+
+class TypeExtractionTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testResultTypeQueryable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val productedType = env.createInput(new CustomTypeInputFormat).getType()
+    assertEquals(productedType, BasicTypeInfo.LONG_TYPE_INFO)
+  }
+
+}
+
+object TypeExtractionTest {
+  class CustomTypeInputFormat extends FileInputFormat[String] with ResultTypeQueryable[Long] {
+
+    override def getProducedType: TypeInformation[Long] =
+      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+
+    override def reachedEnd(): Boolean = throw new UnsupportedOperationException()
+
+    override def nextRecord(reuse: String): String = throw new UnsupportedOperationException()
+  }
+}