You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/09 12:21:34 UTC
[07/12] git commit: Add dedicated test for the type extraction on
input formats.
Add dedicated test for the type extraction on input formats.
Fix various JavaDoc errors.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ce65fb61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ce65fb61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ce65fb61
Branch: refs/heads/master
Commit: ce65fb6161321c72310d3ba8c52b31ec557233e9
Parents: 5cec8e8
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 8 17:24:43 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 9 03:09:36 2014 +0200
----------------------------------------------------------------------
.../io/AvroInputFormatTypeExtractionTest.java | 77 +++++++
.../stratosphere/spargel/java/OutgoingEdge.java | 2 +-
.../eu/stratosphere/client/CliFrontend.java | 4 +-
.../eu/stratosphere/client/LocalExecutor.java | 6 +-
.../eu/stratosphere/client/program/Client.java | 1 -
.../client/program/PackagedProgram.java | 20 +-
.../client/web/WebInterfaceServer.java | 4 +-
.../stratosphere/api/common/io/InputFormat.java | 2 +-
.../type/extractor/PojoTypeInformationTest.java | 6 +
.../TypeExtractorInputFormatsTest.java | 230 +++++++++++++++++++
10 files changed, 330 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce65fb61/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/java/io/AvroInputFormatTypeExtractionTest.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/java/io/AvroInputFormatTypeExtractionTest.java
new file mode 100644
index 0000000..66467fa
--- /dev/null
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/java/io/AvroInputFormatTypeExtractionTest.java
@@ -0,0 +1,77 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.io;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import eu.stratosphere.api.common.io.InputFormat;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.typeutils.PojoTypeInfo;
+import eu.stratosphere.api.java.typeutils.TypeExtractor;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.types.TypeInformation;
+
+public class AvroInputFormatTypeExtractionTest {
+
+ @Test
+ public void testTypeExtraction() {
+ try {
+ InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class);
+
+ TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format);
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<MyAvroType> input = env.createInput(format);
+ TypeInformation<?> typeInfoDataSet = input.getType();
+
+
+ Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
+ Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
+
+ Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass());
+ Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ public static final class MyAvroType {
+
+ public String theString;
+
+ private double aDouble;
+
+ public double getaDouble() {
+ return aDouble;
+ }
+
+ public void setaDouble(double aDouble) {
+ this.aDouble = aDouble;
+ }
+
+ public void setTheString(String theString) {
+ this.theString = theString;
+ }
+
+ public String getTheString() {
+ return theString;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce65fb61/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/OutgoingEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/OutgoingEdge.java b/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/OutgoingEdge.java
index 9abe0d7..0a63e56 100644
--- a/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/OutgoingEdge.java
+++ b/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/OutgoingEdge.java
@@ -16,7 +16,7 @@ package eu.stratosphere.spargel.java;
* <tt>Edge</tt> objects represent edges between vertices. Edges are defined by their source and target
* vertex id. Edges may have an associated value (for example a weight or a distance), if the
* graph algorithm was initialized with the
- * {@link VertexCentricIteration#withEdgesWithValue(eu.stratosphere.api.java.DataSet, VertexUpdateFunction, MessagingFunction)}
+ * {@link VertexCentricIteration#withValuedEdges(eu.stratosphere.api.java.DataSet, VertexUpdateFunction, MessagingFunction, int)}
* method.
*
* @param <VertexKey> The type of the vertex key.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce65fb61/stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java
index a3f73db..7d97143 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java
@@ -815,8 +815,6 @@ public class CliFrontend {
/**
* Prints the help for the client.
- *
- * @param options A map with options for actions.
*/
private void printHelp() {
System.out.println("./stratosphere <ACTION> [GENERAL_OPTIONS] [ARGUMENTS]");
@@ -878,7 +876,7 @@ public class CliFrontend {
/**
* Displays exceptions.
*
- * @param e the exception to display.
+ * @param t The exception to display.
*/
private int handleError(Throwable t) {
System.out.println("Error: " + t.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce65fb61/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
index 0f50f96..6822e10 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
@@ -285,7 +285,7 @@ public class LocalExecutor extends PlanExecutor {
/**
* Executes the program represented by the given Pact plan.
*
- * @param pa The program's plan.
+ * @param plan The program's plan.
* @return The net runtime of the program, in milliseconds.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
@@ -325,7 +325,9 @@ public class LocalExecutor extends PlanExecutor {
/**
* Return unoptimized plan as JSON.
- * @return
+ *
+ * @param plan The program plan.
+ * @return The plan as a JSON object.
*/
public static String getPlanAsJSON(Plan plan) {
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce65fb61/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
index ec66f4a..16dfd60 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
@@ -270,7 +270,6 @@ public class Client {
* or if the submission failed. That might be either due to an I/O problem,
* i.e. the job-manager is unreachable, or due to the fact that the execution
* on the nephele system failed.
- * @throws JobInstantiationException Thrown, if the plan assembler function causes an exception.
*/
public JobExecutionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
return run(getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce65fb61/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
index edf36b3..9fe828a 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
@@ -102,7 +102,7 @@ public class PackagedProgram {
*
* @param jarFile
* The jar file which contains the plan.
- * @param className
+ * @param entryPointClassName
* Name of the class which generates the plan. Overrides the class defined
* in the jar file manifest
* @param args
@@ -177,7 +177,8 @@ public class PackagedProgram {
/**
* Returns the plan with all required jars.
- * @throws JobInstantiationException
+ *
+ * @return The plan with attached jar files.
* @throws ProgramInvocationException
*/
public JobWithJars getPlanWithJars() throws ProgramInvocationException {
@@ -261,9 +262,6 @@ public class PackagedProgram {
* @throws ProgramInvocationException
* This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
- * @throws JobInstantiationException
- * Thrown if an error occurred in the user-provided pact assembler. This may indicate
- * missing parameters for generation.
*/
public String getDescription() throws ProgramInvocationException {
if (ProgramDescription.class.isAssignableFrom(this.mainClass)) {
@@ -487,18 +485,16 @@ public class PackagedProgram {
* or it is read from the manifest of the jar. The assembler is handed the given options
* for its assembly.
*
- * @param clazz
- * The name of the assembler class, or null, if the class should be read from
- * the manifest.
+ * @param program The program to create the plan for.
* @param options
* The options for the assembler.
* @return The plan created by the program.
- * @throws JobInstantiationException
+ * @throws ProgramInvocationException
* Thrown, if an error occurred in the user-provided pact assembler.
*/
- private static Plan createPlanFromProgram(Program assembler, String[] options) throws ProgramInvocationException {
+ private static Plan createPlanFromProgram(Program program, String[] options) throws ProgramInvocationException {
try {
- return assembler.getPlan(options);
+ return program.getPlan(options);
} catch (Throwable t) {
throw new ProgramInvocationException("Error while calling the program: " + t.getMessage(), t);
}
@@ -509,7 +505,7 @@ public class PackagedProgram {
* to the system's temp directory.
*
* @return The file names of the extracted temporary files.
- * @throws IOException Thrown, if the extraction process failed.
+ * @throws ProgramInvocationException Thrown, if the extraction process failed.
*/
private static List<File> extractContainedLibaries(File jarFile) throws ProgramInvocationException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce65fb61/stratosphere-clients/src/main/java/eu/stratosphere/client/web/WebInterfaceServer.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/web/WebInterfaceServer.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/web/WebInterfaceServer.java
index 7fe8abb..587bfb6 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/web/WebInterfaceServer.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/web/WebInterfaceServer.java
@@ -247,8 +247,8 @@ public class WebInterfaceServer {
* if the directory exists and creates it if necessary. It also checks read permissions and
* write permission, if necessary.
*
- * @param dir
- * The String describing the directory path.
+ * @param f
+ * The file describing the directory path.
* @param needWritePermission
* A flag indicating whether to check write access.
* @throws IOException
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce65fb61/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InputFormat.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InputFormat.java
index ce3cd95..247ac10 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InputFormat.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InputFormat.java
@@ -88,7 +88,7 @@ public interface InputFormat<OT, T extends InputSplit> extends Serializable {
* instances may remain idle.
* @return The splits of this input that can be processed in parallel.
*
- * @throws Exception Thrown, when the creation of the splits was erroneous.
+ * @throws IOException Thrown, when the creation of the splits was erroneous.
*/
T[] createInputSplits(int minNumSplits) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce65fb61/stratosphere-java/src/test/java/eu/stratosphere/api/java/type/extractor/PojoTypeInformationTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/type/extractor/PojoTypeInformationTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/type/extractor/PojoTypeInformationTest.java
index 772050a..6907077 100644
--- a/stratosphere-java/src/test/java/eu/stratosphere/api/java/type/extractor/PojoTypeInformationTest.java
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/type/extractor/PojoTypeInformationTest.java
@@ -84,4 +84,10 @@ public class PojoTypeInformationTest {
TypeInformation<Recursive1Pojo> type = TypeExtractor.getForClass(Recursive1Pojo.class);
assertTrue("Extracted type is not a Pojo type but should be.", type instanceof PojoTypeInfo);
}
+
+ @Test
+ public void testRecursivePojoObjectTypeExtraction() {
+ TypeInformation<Recursive1Pojo> type = TypeExtractor.getForObject(new Recursive1Pojo());
+ assertTrue("Extracted type is not a Pojo type but should be.", type instanceof PojoTypeInfo);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce65fb61/stratosphere-java/src/test/java/eu/stratosphere/api/java/type/extractor/TypeExtractorInputFormatsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/type/extractor/TypeExtractorInputFormatsTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/type/extractor/TypeExtractorInputFormatsTest.java
new file mode 100644
index 0000000..d2c3642
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/type/extractor/TypeExtractorInputFormatsTest.java
@@ -0,0 +1,230 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.api.java.type.extractor;
+
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.io.GenericInputFormat;
+import eu.stratosphere.api.common.io.InputFormat;
+import eu.stratosphere.api.common.io.statistics.BaseStatistics;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
+import eu.stratosphere.api.java.typeutils.ResultTypeQueryable;
+import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
+import eu.stratosphere.api.java.typeutils.TypeExtractor;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.InputSplit;
+import eu.stratosphere.types.TypeInformation;
+
+@SuppressWarnings("serial")
+public class TypeExtractorInputFormatsTest {
+
+ @Test
+ public void testExtractInputFormatType() {
+ try {
+ InputFormat<?, ?> format = new DummyFloatInputFormat();
+ TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format);
+ assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, typeInfo);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testExtractDerivedInputFormatType() {
+ try {
+ // simple type
+ {
+ InputFormat<?, ?> format = new DerivedInputFormat();
+ TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format);
+ assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, typeInfo);
+ }
+
+ // composite type
+ {
+ InputFormat<?, ?> format = new DerivedTupleInputFormat();
+ TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+ assertTrue(typeInfo.isTupleType());
+ assertTrue(typeInfo instanceof TupleTypeInfo);
+
+ @SuppressWarnings("unchecked")
+ TupleTypeInfo<Tuple3<String, Short, Double>> tupleInfo = (TupleTypeInfo<Tuple3<String, Short, Double>>) typeInfo;
+
+ assertEquals(3, tupleInfo.getArity());
+ assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleInfo.getTypeAt(0));
+ assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, tupleInfo.getTypeAt(1));
+ assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tupleInfo.getTypeAt(2));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultiLevelDerivedInputFormatType() {
+ try {
+
+ // composite type
+ {
+ InputFormat<?, ?> format = new FinalRelativeInputFormat();
+ TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+ assertTrue(typeInfo.isTupleType());
+ assertTrue(typeInfo instanceof TupleTypeInfo);
+
+ @SuppressWarnings("unchecked")
+ TupleTypeInfo<Tuple3<String, Integer, Double>> tupleInfo = (TupleTypeInfo<Tuple3<String, Integer, Double>>) typeInfo;
+
+ assertEquals(3, tupleInfo.getArity());
+ assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleInfo.getTypeAt(0));
+ assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleInfo.getTypeAt(1));
+ assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tupleInfo.getTypeAt(2));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testQueryableFormatType() {
+ try {
+ InputFormat<?, ?> format = new QueryableInputFormat();
+ TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format);
+ assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, typeInfo);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Test formats
+ // --------------------------------------------------------------------------------------------
+
+ public static final class DummyFloatInputFormat implements InputFormat<Float, InputSplit> {
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { return null; }
+
+ @Override
+ public InputSplit[] createInputSplits(int minNumSplits) { return null; }
+
+ @Override
+ public Class<? extends InputSplit> getInputSplitType() { return null; }
+
+ @Override
+ public void open(InputSplit split) {}
+
+ @Override
+ public boolean reachedEnd() { return false; }
+
+ @Override
+ public Float nextRecord(Float reuse) throws IOException { return null; }
+
+ @Override
+ public void close() {}
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class DerivedInputFormat extends GenericInputFormat<Short> {
+
+ @Override
+ public boolean reachedEnd() { return false; }
+
+ @Override
+ public Short nextRecord(Short reuse) { return null; }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class DerivedTupleInputFormat extends GenericInputFormat<Tuple3<String, Short, Double>> {
+
+ @Override
+ public boolean reachedEnd() { return false; }
+
+ @Override
+ public Tuple3<String, Short, Double> nextRecord(Tuple3<String, Short, Double> reuse) { return null; }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static class RelativeInputFormat<T> extends GenericInputFormat<Tuple3<String, T, Double>> {
+
+ @Override
+ public boolean reachedEnd() { return false; }
+
+ @Override
+ public Tuple3<String, T, Double> nextRecord(Tuple3<String, T, Double> reuse) { return null; }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class FinalRelativeInputFormat extends RelativeInputFormat<Integer> {
+
+ @Override
+ public Tuple3<String, Integer, Double> nextRecord(Tuple3<String, Integer, Double> reuse) { return null; }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class QueryableInputFormat implements InputFormat<Float, InputSplit>, ResultTypeQueryable<Double> {
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { return null; }
+
+ @Override
+ public InputSplit[] createInputSplits(int minNumSplits) { return null; }
+
+ @Override
+ public Class<? extends InputSplit> getInputSplitType() { return null; }
+
+ @Override
+ public void open(InputSplit split) {}
+
+ @Override
+ public boolean reachedEnd() { return false; }
+
+ @Override
+ public Float nextRecord(Float reuse) throws IOException { return null; }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public TypeInformation<Double> getProducedType() {
+ return BasicTypeInfo.DOUBLE_TYPE_INFO;
+ }
+ }
+}