You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/10/15 07:47:33 UTC
[2/2] flink git commit: [FLINK-4108] [scala] Respect
ResultTypeQueryable for InputFormats.
[FLINK-4108] [scala] Respect ResultTypeQueryable for InputFormats.
This closes #2619
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f726980
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f726980
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f726980
Branch: refs/heads/release-1.1
Commit: 9f7269808f3694815bba1e4dbf050db2a2dfe15f
Parents: 9591d50
Author: twalthr <tw...@apache.org>
Authored: Tue Oct 11 11:19:32 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Oct 15 08:04:55 2016 +0200
----------------------------------------------------------------------
.../flink/api/scala/ExecutionEnvironment.scala | 6 +--
.../org/apache/flink/api/scala/package.scala | 11 +++-
.../scala/typeutils/TypeExtractionTest.scala | 53 ++++++++++++++++++++
3 files changed, 66 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f726980/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index f03cb84..4f9d569 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -110,7 +110,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
*/
@PublicEvolving
def getRestartStrategy: RestartStrategyConfiguration = {
- javaEnv.getRestartStrategy()
+ javaEnv.getRestartStrategy
}
/**
@@ -381,7 +381,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
require(inputFormat != null, "InputFormat must not be null.")
require(filePath != null, "File path must not be null.")
inputFormat.setFilePath(new Path(filePath))
- createInput(inputFormat, implicitly[TypeInformation[T]])
+ createInput(inputFormat, explicitFirst(inputFormat, implicitly[TypeInformation[T]]))
}
/**
@@ -392,7 +392,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
if (inputFormat == null) {
throw new IllegalArgumentException("InputFormat must not be null.")
}
- createInput(inputFormat, implicitly[TypeInformation[T]])
+ createInput(inputFormat, explicitFirst(inputFormat, implicitly[TypeInformation[T]]))
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9f726980/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index e5ca465..6096388 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -21,8 +21,9 @@ package org.apache.flink.api
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.java.{DataSet => JavaDataSet}
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, TypeUtils, ScalaNothingTypeInfo}
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, ScalaNothingTypeInfo, TypeUtils}
import _root_.scala.reflect.ClassTag
import language.experimental.macros
@@ -52,6 +53,14 @@ package object scala {
// We need to wrap Java DataSet because we need the scala operations
private[flink] def wrap[R: ClassTag](set: JavaDataSet[R]) = new DataSet[R](set)
+ // Checks if object has explicit type information using ResultTypeQueryable
+ private[flink] def explicitFirst[T](
+ funcOrInputFormat: AnyRef,
+ typeInfo: TypeInformation[T]): TypeInformation[T] = funcOrInputFormat match {
+ case rtq: ResultTypeQueryable[T] => rtq.getProducedType
+ case _ => typeInfo
+ }
+
private[flink] def fieldNames2Indices(
typeInfo: TypeInformation[_],
fields: Array[String]): Array[Int] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f726980/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
new file mode 100644
index 0000000..0462ffa
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeExtractionTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.io.FileInputFormat
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.TypeExtractionTest.CustomTypeInputFormat
+import org.apache.flink.util.TestLogger
+import org.junit.Assert.assertEquals
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+
+class TypeExtractionTest extends TestLogger with JUnitSuiteLike {
+
+ @Test
+ def testResultTypeQueryable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val productedType = env.createInput(new CustomTypeInputFormat).getType()
+ assertEquals(productedType, BasicTypeInfo.LONG_TYPE_INFO)
+ }
+
+}
+
+object TypeExtractionTest {
+ class CustomTypeInputFormat extends FileInputFormat[String] with ResultTypeQueryable[Long] {
+
+ override def getProducedType: TypeInformation[Long] =
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+
+ override def reachedEnd(): Boolean = throw new UnsupportedOperationException()
+
+ override def nextRecord(reuse: String): String = throw new UnsupportedOperationException()
+ }
+}