You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:39 UTC
[49/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc b/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc
deleted file mode 100644
index 02c11af..0000000
--- a/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc
+++ /dev/null
@@ -1,35 +0,0 @@
-[
-{"namespace": "org.apache.flink.api.io.avro.generated",
- "type": "record",
- "name": "Address",
- "fields": [
- {"name": "num", "type": "int"},
- {"name": "street", "type": "string"},
- {"name": "city", "type": "string"},
- {"name": "state", "type": "string"},
- {"name": "zip", "type": "string"}
- ]
-},
-{"namespace": "org.apache.flink.api.io.avro.generated",
- "type": "record",
- "name": "User",
- "fields": [
- {"name": "name", "type": "string"},
- {"name": "favorite_number", "type": ["int", "null"]},
- {"name": "favorite_color", "type": ["string", "null"]},
- {"name": "type_long_test", "type": ["long", "null"]},
- {"name": "type_double_test", "type": "double"},
- {"name": "type_null_test", "type": ["null"]},
- {"name": "type_bool_test", "type": ["boolean"]},
- {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},
- {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}},
- {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null},
- {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}},
- {"name": "type_map", "type": {"type": "map", "values": "long"}},
- {"name": "type_fixed",
- "size": 16,
- "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] },
- {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
- {"name": "type_nested", "type": ["null", "Address"]}
- ]
-}]
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- a/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml b/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro b/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro
deleted file mode 100644
index 45308b9..0000000
Binary files a/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
deleted file mode 100644
index 8f423d9..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
+++ /dev/null
@@ -1,182 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-batch-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-hadoop-compatibility_2.10</artifactId>
- <name>flink-hadoop-compatibility</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-hadoop2</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <!-- test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- </dependencies>
-
-
- <build>
- <plugins>
- <!-- activate API compatibility checks -->
- <plugin>
- <groupId>com.github.siom79.japicmp</groupId>
- <artifactId>japicmp-maven-plugin</artifactId>
- </plugin>
- <!-- Scala Compiler -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.4</version>
- <executions>
- <!-- Run scala compiler in the process-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) compile phase -->
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <jvmArgs>
- <jvmArg>-Xms128m</jvmArg>
- <jvmArg>-Xmx512m</jvmArg>
- </jvmArgs>
- </configuration>
- </plugin>
-
- <!-- Eclipse Integration -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.8</version>
- <configuration>
- <downloadSources>true</downloadSources>
- <projectnatures>
- <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
- <projectnature>org.eclipse.jdt.core.javanature</projectnature>
- </projectnatures>
- <buildcommands>
- <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <classpathContainers>
- <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- </classpathContainers>
- <excludes>
- <exclude>org.scala-lang:scala-library</exclude>
- <exclude>org.scala-lang:scala-compiler</exclude>
- </excludes>
- <sourceIncludes>
- <sourceInclude>**/*.scala</sourceInclude>
- <sourceInclude>**/*.java</sourceInclude>
- </sourceIncludes>
- </configuration>
- </plugin>
-
- <!-- Adding scala source directories to build path -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <!-- Add src/main/scala to eclipse build path -->
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- Scala Code Style, most of the configuration done via plugin management -->
- <plugin>
- <groupId>org.scalastyle</groupId>
- <artifactId>scalastyle-maven-plugin</artifactId>
- <configuration>
- <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
deleted file mode 100644
index 7bcb4bf..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
-import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
-import org.apache.hadoop.io.Writable;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Type information for data types that extend Hadoop's {@link Writable} interface. The Writable
- * interface defines the serialization and deserialization routines for the data type.
- *
- * @param <T> The type of the class represented by this type information.
- */
-@Public
-public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
-
- private static final long serialVersionUID = 1L;
-
- private final Class<T> typeClass;
-
- @PublicEvolving
- public WritableTypeInfo(Class<T> typeClass) {
- this.typeClass = checkNotNull(typeClass);
-
- checkArgument(
- Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class),
- "WritableTypeInfo can only be used for subclasses of %s", Writable.class.getName());
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- @PublicEvolving
- public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
- if(Comparable.class.isAssignableFrom(typeClass)) {
- return new WritableComparator(sortOrderAscending, typeClass);
- }
- else {
- throw new UnsupportedOperationException("Cannot create Comparator for "+typeClass.getCanonicalName()+". " +
- "Class does not implement Comparable interface.");
- }
- }
-
- @Override
- @PublicEvolving
- public boolean isBasicType() {
- return false;
- }
-
- @Override
- @PublicEvolving
- public boolean isTupleType() {
- return false;
- }
-
- @Override
- @PublicEvolving
- public int getArity() {
- return 1;
- }
-
- @Override
- @PublicEvolving
- public int getTotalFields() {
- return 1;
- }
-
- @Override
- @PublicEvolving
- public Class<T> getTypeClass() {
- return this.typeClass;
- }
-
- @Override
- @PublicEvolving
- public boolean isKeyType() {
- return Comparable.class.isAssignableFrom(typeClass);
- }
-
- @Override
- @PublicEvolving
- public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
- return new WritableSerializer<T>(typeClass);
- }
-
- @Override
- public String toString() {
- return "WritableType<" + typeClass.getName() + ">";
- }
-
- @Override
- public int hashCode() {
- return typeClass.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof WritableTypeInfo) {
- @SuppressWarnings("unchecked")
- WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj;
-
- return writableTypeInfo.canEqual(this) &&
- typeClass == writableTypeInfo.typeClass;
-
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof WritableTypeInfo;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @PublicEvolving
- static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass) {
- if (Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class)) {
- return new WritableTypeInfo<T>(typeClass);
- }
- else {
- throw new InvalidTypesException("The given class is no subclass of " + Writable.class.getName());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
deleted file mode 100644
index 3a95d94..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.Writable;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-
-public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
-
- private static final long serialVersionUID = 1L;
-
- private Class<T> type;
-
- private final boolean ascendingComparison;
-
- private transient T reference;
-
- private transient T tempReference;
-
- private transient Kryo kryo;
-
- @SuppressWarnings("rawtypes")
- private final TypeComparator[] comparators = new TypeComparator[] {this};
-
- public WritableComparator(boolean ascending, Class<T> type) {
- this.type = type;
- this.ascendingComparison = ascending;
- }
-
- @Override
- public int hash(T record) {
- return record.hashCode();
- }
-
- @Override
- public void setReference(T toCompare) {
- checkKryoInitialized();
-
- reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type));
- }
-
- @Override
- public boolean equalToReference(T candidate) {
- return candidate.equals(reference);
- }
-
- @Override
- public int compareToReference(TypeComparator<T> referencedComparator) {
- T otherRef = ((WritableComparator<T>) referencedComparator).reference;
- int comp = otherRef.compareTo(reference);
- return ascendingComparison ? comp : -comp;
- }
-
- @Override
- public int compare(T first, T second) {
- int comp = first.compareTo(second);
- return ascendingComparison ? comp : -comp;
- }
-
- @Override
- public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
- ensureReferenceInstantiated();
- ensureTempReferenceInstantiated();
-
- reference.readFields(firstSource);
- tempReference.readFields(secondSource);
-
- int comp = reference.compareTo(tempReference);
- return ascendingComparison ? comp : -comp;
- }
-
- @Override
- public boolean supportsNormalizedKey() {
- return NormalizableKey.class.isAssignableFrom(type);
- }
-
- @Override
- public int getNormalizeKeyLen() {
- ensureReferenceInstantiated();
-
- NormalizableKey<?> key = (NormalizableKey<?>) reference;
- return key.getMaxNormalizedKeyLen();
- }
-
- @Override
- public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
- return keyBytes < getNormalizeKeyLen();
- }
-
- @Override
- public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
- NormalizableKey<?> key = (NormalizableKey<?>) record;
- key.copyNormalizedKey(target, offset, numBytes);
- }
-
- @Override
- public boolean invertNormalizedKey() {
- return !ascendingComparison;
- }
-
- @Override
- public TypeComparator<T> duplicate() {
- return new WritableComparator<T>(ascendingComparison, type);
- }
-
- @Override
- public int extractKeys(Object record, Object[] target, int index) {
- target[index] = record;
- return 1;
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public TypeComparator[] getFlatComparators() {
- return comparators;
- }
-
- // --------------------------------------------------------------------------------------------
- // unsupported normalization
- // --------------------------------------------------------------------------------------------
-
- @Override
- public boolean supportsSerializationWithKeyNormalization() {
- return false;
- }
-
- @Override
- public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- // --------------------------------------------------------------------------------------------
-
- private void checkKryoInitialized() {
- if (this.kryo == null) {
- this.kryo = new Kryo();
-
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
- instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
- kryo.setInstantiatorStrategy(instantiatorStrategy);
-
- this.kryo.setAsmEnabled(true);
- this.kryo.register(type);
- }
- }
-
- private void ensureReferenceInstantiated() {
- if (reference == null) {
- reference = InstantiationUtil.instantiate(type, Writable.class);
- }
- }
-
- private void ensureTempReferenceInstantiated() {
- if (tempReference == null) {
- tempReference = InstantiationUtil.instantiate(type, Writable.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
deleted file mode 100644
index 9036d75..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-
-public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private final Class<T> typeClass;
-
- private transient Kryo kryo;
-
- private transient T copyInstance;
-
- public WritableSerializer(Class<T> typeClass) {
- this.typeClass = typeClass;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public T createInstance() {
- if(typeClass == NullWritable.class) {
- return (T) NullWritable.get();
- }
- return InstantiationUtil.instantiate(typeClass);
- }
-
-
-
- @Override
- public T copy(T from) {
- checkKryoInitialized();
-
- return KryoUtils.copy(from, kryo, this);
- }
-
- @Override
- public T copy(T from, T reuse) {
- checkKryoInitialized();
-
- return KryoUtils.copy(from, reuse, kryo, this);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(T record, DataOutputView target) throws IOException {
- record.write(target);
- }
-
- @Override
- public T deserialize(DataInputView source) throws IOException {
- return deserialize(createInstance(), source);
- }
-
- @Override
- public T deserialize(T reuse, DataInputView source) throws IOException {
- reuse.readFields(source);
- return reuse;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- ensureInstanceInstantiated();
- copyInstance.readFields(source);
- copyInstance.write(target);
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public WritableSerializer<T> duplicate() {
- return new WritableSerializer<T>(typeClass);
- }
-
- // --------------------------------------------------------------------------------------------
-
- private void ensureInstanceInstantiated() {
- if (copyInstance == null) {
- copyInstance = createInstance();
- }
- }
-
- private void checkKryoInitialized() {
- if (this.kryo == null) {
- this.kryo = new Kryo();
-
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
- instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
- kryo.setInstantiatorStrategy(instantiatorStrategy);
-
- this.kryo.setAsmEnabled(true);
- this.kryo.register(typeClass);
- }
- }
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return this.typeClass.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof WritableSerializer) {
- WritableSerializer<?> other = (WritableSerializer<?>) obj;
-
- return other.canEqual(this) && typeClass == other.typeClass;
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof WritableSerializer;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
deleted file mode 100644
index 9e8a3e4..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-
-/**
- * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
- *
- * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat}
- * and {@link org.apache.hadoop.mapreduce.InputFormat}.
- *
- * Key value pairs produced by the Hadoop InputFormats are converted into Flink
- * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field
- * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field
- * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
- *
- */
-
-public final class HadoopInputs {
- // ----------------------------------- Hadoop Input Format ---------------------------------------
-
- /**
- * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
- *
- * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
- */
- public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
- // set input path in JobConf
- org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
- // return wrapping InputFormat
- return createHadoopInput(mapredInputFormat, key, value, job);
- }
-
- /**
- * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}.
- *
- * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
- */
- public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
- return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
- }
-
- /**
- * Creates a Flink {@link InputFormat} to read a Hadoop sequence file for the given key and value classes.
- *
- * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat.
- */
- public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
- return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
- }
-
- /**
- * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.InputFormat}.
- *
- * @return A Flink InputFormat that wraps the Hadoop InputFormat.
- */
- public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
- return new HadoopInputFormat<>(mapredInputFormat, key, value, job);
- }
-
- /**
- * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
- *
- * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
- */
- public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException
- {
- // set input path in Job
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
- // return wrapping InputFormat
- return createHadoopInput(mapreduceInputFormat, key, value, job);
- }
-
- /**
- * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
- *
- * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
- */
- public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException
- {
- return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
- }
-
- /**
- * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.InputFormat}.
- *
- * @return A Flink InputFormat that wraps the Hadoop InputFormat.
- */
- public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
- org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
- {
- return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
deleted file mode 100644
index 97ca329..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility;
-
-import org.apache.commons.cli.Option;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Utility class to work with Apache Hadoop libraries.
- */
-public class HadoopUtils {
- /**
- * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser}
- *
- * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser}
- * @return A {@link ParameterTool}
- * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser}
- * @see GenericOptionsParser
- */
- public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException {
- Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions();
- Map<String, String> map = new HashMap<String, String>();
- for (Option option : options) {
- String[] split = option.getValue().split("=");
- map.put(split[0], split[1]);
- }
- return ParameterTool.fromMap(map);
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
deleted file mode 100644
index ba8aa90..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
- */
-@SuppressWarnings("rawtypes")
-@Public
-public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
- extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
- implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
- private transient JobConf jobConf;
-
- private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
- private transient Reporter reporter;
-
- /**
- * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
- *
- * @param hadoopMapper The Hadoop Mapper to wrap.
- */
- public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
- this(hadoopMapper, new JobConf());
- }
-
- /**
- * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
- * The Hadoop Mapper is configured with the provided JobConf.
- *
- * @param hadoopMapper The Hadoop Mapper to wrap.
- * @param conf The JobConf that is used to configure the Hadoop Mapper.
- */
- public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
- if(hadoopMapper == null) {
- throw new NullPointerException("Mapper may not be null.");
- }
- if(conf == null) {
- throw new NullPointerException("JobConf may not be null.");
- }
-
- this.mapper = hadoopMapper;
- this.jobConf = conf;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.mapper.configure(jobConf);
-
- this.reporter = new HadoopDummyReporter();
- this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
- }
-
- @Override
- public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
- throws Exception {
- outputCollector.setFlinkCollector(out);
- mapper.map(value.f0, value.f1, outputCollector, reporter);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
- Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
- Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
-
- final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
- final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
- return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
- }
-
- /**
- * Custom serialization methods.
- * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
- */
- private void writeObject(final ObjectOutputStream out) throws IOException {
- out.writeObject(mapper.getClass());
- jobConf.write(out);
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
- Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass =
- (Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
- mapper = InstantiationUtil.instantiate(mapperClass);
-
- jobConf = new JobConf();
- jobConf.readFields(in);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
deleted file mode 100644
index c1acc2b..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
- */
-@SuppressWarnings("rawtypes")
-@Public
-public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
- extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
- implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,
- ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
- private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
- private transient JobConf jobConf;
-
- private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
- private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
- private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
- private transient Reporter reporter;
-
- /**
- * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
- *
- * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
- * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
- */
- public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
- Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
- this(hadoopReducer, hadoopCombiner, new JobConf());
- }
-
- /**
- * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
- *
- * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
- * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
- * @param conf The JobConf that is used to configure both Hadoop Reducers.
- */
- public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
- Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
- if(hadoopReducer == null) {
- throw new NullPointerException("Reducer may not be null.");
- }
- if(hadoopCombiner == null) {
- throw new NullPointerException("Combiner may not be null.");
- }
- if(conf == null) {
- throw new NullPointerException("JobConf may not be null.");
- }
-
- this.reducer = hadoopReducer;
- this.combiner = hadoopCombiner;
- this.jobConf = conf;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.reducer.configure(jobConf);
- this.combiner.configure(jobConf);
-
- this.reporter = new HadoopDummyReporter();
- Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
- TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
- this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer);
- this.combineCollector = new HadoopOutputCollector<>();
- this.reduceCollector = new HadoopOutputCollector<>();
- }
-
- @Override
- public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
- throws Exception {
- reduceCollector.setFlinkCollector(out);
- valueIterator.set(values.iterator());
- reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
- }
-
- @Override
- public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
- combineCollector.setFlinkCollector(out);
- valueIterator.set(values.iterator());
- combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
- Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
- Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
-
- final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
- final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass);
- return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
- }
-
- /**
- * Custom serialization methods.
- * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
- */
- private void writeObject(final ObjectOutputStream out) throws IOException {
-
- out.writeObject(reducer.getClass());
- out.writeObject(combiner.getClass());
- jobConf.write(out);
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-
- Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
- (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
- reducer = InstantiationUtil.instantiate(reducerClass);
-
- Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass =
- (Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
- combiner = InstantiationUtil.instantiate(combinerClass);
-
- jobConf = new JobConf();
- jobConf.readFields(in);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
deleted file mode 100644
index 55aea24..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
- */
-@SuppressWarnings("rawtypes")
-@Public
-public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
- extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
- implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
- private transient JobConf jobConf;
-
- private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
- private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
- private transient Reporter reporter;
-
- /**
- * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
- *
- * @param hadoopReducer The Hadoop Reducer to wrap.
- */
- public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
- this(hadoopReducer, new JobConf());
- }
-
- /**
- * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
- *
- * @param hadoopReducer The Hadoop Reducer to wrap.
- * @param conf The JobConf that is used to configure the Hadoop Reducer.
- */
- public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
- if(hadoopReducer == null) {
- throw new NullPointerException("Reducer may not be null.");
- }
- if(conf == null) {
- throw new NullPointerException("JobConf may not be null.");
- }
-
- this.reducer = hadoopReducer;
- this.jobConf = conf;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.reducer.configure(jobConf);
-
- this.reporter = new HadoopDummyReporter();
- this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
- Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
- TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
- this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer);
- }
-
- @Override
- public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
- throws Exception {
-
- reduceCollector.setFlinkCollector(out);
- valueIterator.set(values.iterator());
- reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
- Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
- Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
-
- final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
- final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
- return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
- }
-
- /**
- * Custom serialization methods
- * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
- */
- private void writeObject(final ObjectOutputStream out) throws IOException {
-
- out.writeObject(reducer.getClass());
- jobConf.write(out);
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-
- Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
- (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
- reducer = InstantiationUtil.instantiate(reducerClass);
-
- jobConf = new JobConf();
- jobConf.readFields(in);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
deleted file mode 100644
index bfe03d3..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.mapred.OutputCollector;
-
-import java.io.IOException;
-
-/**
- * A Hadoop OutputCollector that wraps a Flink OutputCollector.
- * On each call of collect() the data is forwarded to the wrapped Flink collector.
- */
-public final class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> {
-
- private Collector<Tuple2<KEY,VALUE>> flinkCollector;
-
- private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
-
- /**
- * Set the wrapped Flink collector.
- *
- * @param flinkCollector The wrapped Flink OutputCollector.
- */
- public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
- this.flinkCollector = flinkCollector;
- }
-
- /**
- * Use the wrapped Flink collector to collect a key-value pair for Flink.
- *
- * @param key the key to collect
- * @param val the value to collect
- * @throws IOException unexpected of key or value in key-value pair.
- */
- @Override
- public void collect(final KEY key, final VALUE val) throws IOException {
- this.outTuple.f0 = key;
- this.outTuple.f1 = val;
- this.flinkCollector.collect(outTuple);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
deleted file mode 100644
index 2d204b8..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
- */
-public class HadoopTupleUnwrappingIterator<KEY,VALUE>
- extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final TypeSerializer<KEY> keySerializer;
-
- private transient Iterator<Tuple2<KEY,VALUE>> iterator;
-
- private transient KEY curKey;
- private transient VALUE firstValue;
- private transient boolean atFirst;
-
- public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) {
- this.keySerializer = checkNotNull(keySerializer);
- }
-
- /**
- * Set the Flink iterator to wrap.
- *
- * @param iterator The Flink iterator to wrap.
- */
- @Override
- public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
- this.iterator = iterator;
- if(this.hasNext()) {
- final Tuple2<KEY, VALUE> tuple = iterator.next();
- this.curKey = keySerializer.copy(tuple.f0);
- this.firstValue = tuple.f1;
- this.atFirst = true;
- } else {
- this.atFirst = false;
- }
- }
-
- @Override
- public boolean hasNext() {
- if(this.atFirst) {
- return true;
- }
- return iterator.hasNext();
- }
-
- @Override
- public VALUE next() {
- if(this.atFirst) {
- this.atFirst = false;
- return firstValue;
- }
-
- final Tuple2<KEY, VALUE> tuple = iterator.next();
- return tuple.f1;
- }
-
- public KEY getCurrentKey() {
- return this.curKey;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
deleted file mode 100644
index 133a5f4..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.hadoopcompatibility.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.hadoop.mapreduce
-import org.apache.flink.api.scala.hadoop.mapred
-import org.apache.hadoop.fs.{Path => HadoopPath}
-import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
-import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat}
-
-/**
- * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
- *
- * It provides methods to create Flink InputFormat wrappers for Hadoop
- * [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]].
- *
- * Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where
- * the first field is the key and the second field is the value.
- *
- */
-object HadoopInputs {
-
- /**
- * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
- * [[org.apache.hadoop.mapred.FileInputFormat]].
- */
- def readHadoopFile[K, V](
- mapredInputFormat: MapredFileInputFormat[K, V],
- key: Class[K],
- value: Class[V],
- inputPath: String,
- job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
-
- // set input path in JobConf
- MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
- // wrap mapredInputFormat
- createHadoopInput(mapredInputFormat, key, value, job)
- }
-
- /**
- * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
- * [[org.apache.hadoop.mapred.FileInputFormat]].
- */
- def readHadoopFile[K, V](
- mapredInputFormat: MapredFileInputFormat[K, V],
- key: Class[K],
- value: Class[V],
- inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
-
- readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
- }
-
- /**
- * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence
- * file with the given key and value classes.
- */
- def readSequenceFile[K, V](
- key: Class[K],
- value: Class[V],
- inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
-
- readHadoopFile(
- new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
- key,
- value,
- inputPath
- )
- }
-
- /**
- * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
- * [[org.apache.hadoop.mapred.InputFormat]].
- */
- def createHadoopInput[K, V](
- mapredInputFormat: MapredInputFormat[K, V],
- key: Class[K],
- value: Class[V],
- job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
-
- new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
- }
-
- /**
- * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
- * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
- */
- def readHadoopFile[K, V](
- mapreduceInputFormat: MapreduceFileInputFormat[K, V],
- key: Class[K],
- value: Class[V],
- inputPath: String,
- job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
-
- // set input path in Job
- MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
- // wrap mapreduceInputFormat
- createHadoopInput(mapreduceInputFormat, key, value, job)
- }
-
- /**
- * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
- * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
- */
- def readHadoopFile[K, V](
- mapreduceInputFormat: MapreduceFileInputFormat[K, V],
- key: Class[K],
- value: Class[V],
- inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] =
- {
- readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance)
- }
-
- /**
- * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
- * [[org.apache.hadoop.mapreduce.InputFormat]].
- */
- def createHadoopInput[K, V](
- mapreduceInputFormat: MapreduceInputFormat[K, V],
- key: Class[K],
- value: Class[V],
- job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
-
- new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
deleted file mode 100644
index 2aefd9f..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparator;
-
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class WritableExtractionTest {
-
- @Test
- public void testDetectWritable() {
- // writable interface itself must not be writable
- assertFalse(TypeExtractor.isHadoopWritable(Writable.class));
-
- // various forms of extension
- assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class));
- assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class));
- assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class));
-
- // some non-writables
- assertFalse(TypeExtractor.isHadoopWritable(String.class));
- assertFalse(TypeExtractor.isHadoopWritable(List.class));
- assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class));
- }
-
- @Test
- public void testCreateWritableInfo() {
- TypeInformation<DirectWritable> info1 =
- TypeExtractor.createHadoopWritableTypeInfo(DirectWritable.class);
- assertEquals(DirectWritable.class, info1.getTypeClass());
-
- TypeInformation<ViaInterfaceExtension> info2 =
- TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class);
- assertEquals(ViaInterfaceExtension.class, info2.getTypeClass());
-
- TypeInformation<ViaAbstractClassExtension> info3 =
- TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class);
- assertEquals(ViaAbstractClassExtension.class, info3.getTypeClass());
- }
-
- @Test
- public void testValidateTypeInfo() {
- // validate unrelated type info
- TypeExtractor.validateIfWritable(BasicTypeInfo.STRING_TYPE_INFO, String.class);
-
- // validate writable type info correctly
- TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
- DirectWritable.class), DirectWritable.class);
- TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
- ViaInterfaceExtension.class), ViaInterfaceExtension.class);
- TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
- ViaAbstractClassExtension.class), ViaAbstractClassExtension.class);
-
- // incorrect case: not writable at all
- try {
- TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
- DirectWritable.class), String.class);
- fail("should have failed with an exception");
- } catch (InvalidTypesException e) {
- // expected
- }
-
- // incorrect case: wrong writable
- try {
- TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
- ViaInterfaceExtension.class), DirectWritable.class);
- fail("should have failed with an exception");
- } catch (InvalidTypesException e) {
- // expected
- }
- }
-
- @Test
- public void testExtractFromFunction() {
- RichMapFunction<DirectWritable, DirectWritable> function = new RichMapFunction<DirectWritable, DirectWritable>() {
- @Override
- public DirectWritable map(DirectWritable value) throws Exception {
- return null;
- }
- };
-
- TypeInformation<DirectWritable> outType =
- TypeExtractor.getMapReturnTypes(function, new WritableTypeInfo<>(DirectWritable.class));
-
- assertTrue(outType instanceof WritableTypeInfo);
- assertEquals(DirectWritable.class, outType.getTypeClass());
- }
-
- @Test
- public void testExtractAsPartOfPojo() {
- PojoTypeInfo<PojoWithWritable> pojoInfo =
- (PojoTypeInfo<PojoWithWritable>) TypeExtractor.getForClass(PojoWithWritable.class);
-
- boolean foundWritable = false;
- for (int i = 0; i < pojoInfo.getArity(); i++) {
- PojoField field = pojoInfo.getPojoFieldAt(i);
- String name = field.getField().getName();
-
- if (name.equals("hadoopCitizen")) {
- if (foundWritable) {
- fail("already seen");
- }
- foundWritable = true;
- assertEquals(new WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation());
- assertEquals(DirectWritable.class, field.getTypeInformation().getTypeClass());
-
- }
- }
-
- assertTrue("missed the writable type", foundWritable);
- }
-
- @Test
- public void testInputValidationError() {
-
- RichMapFunction<Writable, String> function = new RichMapFunction<Writable, String>() {
- @Override
- public String map(Writable value) throws Exception {
- return null;
- }
- };
-
- @SuppressWarnings("unchecked")
- TypeInformation<Writable> inType =
- (TypeInformation<Writable>) (TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class);
-
- try {
- TypeExtractor.getMapReturnTypes(function, inType);
- fail("exception expected");
- }
- catch (InvalidTypesException e) {
- // right
- }
- }
-
- // ------------------------------------------------------------------------
- // test type classes
- // ------------------------------------------------------------------------
-
- public interface ExtendedWritable extends Writable {}
-
- public static abstract class AbstractWritable implements Writable {}
-
- public static class DirectWritable implements Writable {
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {}
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {}
- }
-
- public static class ViaInterfaceExtension implements ExtendedWritable {
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {}
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {}
- }
-
- public static class ViaAbstractClassExtension extends AbstractWritable {
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {}
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {}
- }
-
- public static class PojoWithWritable {
- public String str;
- public DirectWritable hadoopCitizen;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
deleted file mode 100644
index 3d2b652..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.io.Writable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class WritableInfoParserTest {
-
- @Test
- public void testWritableType() {
- TypeInformation<?> ti = TypeInfoParser.parse(
- "Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>");
-
- Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
- Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass());
- }
-
- @Test
- public void testPojoWithWritableType() {
- TypeInformation<?> ti = TypeInfoParser.parse(
- "org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<"
- + "basic=Integer,"
- + "tuple=Tuple2<String, Integer>,"
- + "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>,"
- + "array=String[]"
- + ">");
- Assert.assertTrue(ti instanceof PojoTypeInfo);
- PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
- Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
- Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
- Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo);
- Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo);
- }
- // ------------------------------------------------------------------------
- // Test types
- // ------------------------------------------------------------------------
-
- public static class MyWritable implements Writable {
-
- @Override
- public void write(DataOutput out) throws IOException {}
-
- @Override
- public void readFields(DataInput in) throws IOException {}
- }
-
- public static class MyPojo {
- public Integer basic;
- public Tuple2<String, Integer> tuple;
- public MyWritable hadoopCitizen;
- public String[] array;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
deleted file mode 100644
index eb9cdf0..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.io.Writable;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class WritableTypeInfoTest extends TestLogger {
-
- @Test
- public void testWritableTypeInfoEquality() {
- WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
- WritableTypeInfo<TestClass> tpeInfo2 = new WritableTypeInfo<>(TestClass.class);
-
- assertEquals(tpeInfo1, tpeInfo2);
- assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
- }
-
- @Test
- public void testWritableTypeInfoInequality() {
- WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
- WritableTypeInfo<AlternateClass> tpeInfo2 = new WritableTypeInfo<>(AlternateClass.class);
-
- assertNotEquals(tpeInfo1, tpeInfo2);
- }
-
- // ------------------------------------------------------------------------
- // test types
- // ------------------------------------------------------------------------
-
- public static class TestClass implements Writable {
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {}
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {}
- }
-
- public static class AlternateClass implements Writable {
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {}
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {}
- }
-}