You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:45 UTC
[29/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
deleted file mode 100644
index ba8a55b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
-
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.util.List;
-
-import scala.Product;
-
-
-/**
- * These classes encapsulate the logic of accessing a field specified by the user as either an index
- * or a field expression string. TypeInformation can also be requested for the field.
- * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field").
- */
-public abstract class FieldAccessor<R, F> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- TypeInformation fieldType;
-
- // Note: Returns the corresponding basic type for array of a primitive type (Integer for int[]).
- @SuppressWarnings("unchecked")
- public TypeInformation<F> getFieldType() {
- return fieldType;
- }
-
-
- public abstract F get(R record);
-
- // Note: This has to return the result, because the SimpleFieldAccessor might not be able to modify the
- // record in place. (for example, when R is simply Double) (Unfortunately there is no passing by reference in Java.)
- public abstract R set(R record, F fieldValue);
-
-
-
- @SuppressWarnings("unchecked")
- public static <R, F> FieldAccessor<R, F> create(int pos, TypeInformation<R> typeInfo, ExecutionConfig config) {
- if (typeInfo.isTupleType() && ((TupleTypeInfoBase)typeInfo).isCaseClass()) {
- return new ProductFieldAccessor<R, F>(pos, typeInfo, config);
- } else if (typeInfo.isTupleType()) {
- return new TupleFieldAccessor<R, F>(pos, typeInfo);
- } else if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) {
- return new ArrayFieldAccessor<R, F>(pos, typeInfo);
- } else {
- if(pos != 0) {
- throw new IndexOutOfBoundsException("Not 0th field selected for a simple type (non-tuple, non-array).");
- }
- return (FieldAccessor<R, F>) new SimpleFieldAccessor<R>(typeInfo);
- }
- }
-
- public static <R, F> FieldAccessor<R, F> create(String field, TypeInformation<R> typeInfo, ExecutionConfig config) {
- if (typeInfo.isTupleType() && ((TupleTypeInfoBase)typeInfo).isCaseClass()) {
- int pos = ((TupleTypeInfoBase)typeInfo).getFieldIndex(field);
- if(pos == -2) {
- throw new RuntimeException("Invalid field selected: " + field);
- }
- return new ProductFieldAccessor<R, F>(pos, typeInfo, config);
- } else if(typeInfo.isTupleType()) {
- return new TupleFieldAccessor<R, F>(((TupleTypeInfo)typeInfo).getFieldIndex(field), typeInfo);
- } else {
- return new PojoFieldAccessor<R, F>(field, typeInfo, config);
- }
- }
-
-
-
- public static class SimpleFieldAccessor<R> extends FieldAccessor<R, R> {
-
- private static final long serialVersionUID = 1L;
-
- SimpleFieldAccessor(TypeInformation<R> typeInfo) {
- this.fieldType = typeInfo;
- }
-
- @Override
- public R get(R record) {
- return record;
- }
-
- @Override
- public R set(R record, R fieldValue) {
- return fieldValue;
- }
- }
-
- public static class ArrayFieldAccessor<R, F> extends FieldAccessor<R, F> {
-
- private static final long serialVersionUID = 1L;
-
- int pos;
-
- ArrayFieldAccessor(int pos, TypeInformation typeInfo) {
- this.pos = pos;
- this.fieldType = BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType());
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public F get(R record) {
- return (F) Array.get(record, pos);
- }
-
- @Override
- public R set(R record, F fieldValue) {
- Array.set(record, pos, fieldValue);
- return record;
- }
- }
-
- public static class TupleFieldAccessor<R, F> extends FieldAccessor<R, F> {
-
- private static final long serialVersionUID = 1L;
-
- int pos;
-
- TupleFieldAccessor(int pos, TypeInformation<R> typeInfo) {
- this.pos = pos;
- this.fieldType = ((TupleTypeInfo)typeInfo).getTypeAt(pos);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public F get(R record) {
- Tuple tuple = (Tuple) record;
- return (F)tuple.getField(pos);
- }
-
- @Override
- public R set(R record, F fieldValue) {
- Tuple tuple = (Tuple) record;
- tuple.setField(fieldValue, pos);
- return record;
- }
- }
-
- public static class PojoFieldAccessor<R, F> extends FieldAccessor<R, F> {
-
- private static final long serialVersionUID = 1L;
-
- PojoComparator comparator;
-
- PojoFieldAccessor(String field, TypeInformation<R> type, ExecutionConfig config) {
- if (!(type instanceof CompositeType<?>)) {
- throw new IllegalArgumentException(
- "Key expressions are only supported on POJO types and Tuples. "
- + "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
- }
-
- @SuppressWarnings("unchecked")
- CompositeType<R> cType = (CompositeType<R>) type;
-
- List<CompositeType.FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
-
- int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
- this.fieldType = fieldDescriptors.get(0).getType();
- Class<?> keyClass = fieldType.getTypeClass();
-
- if (cType instanceof PojoTypeInfo) {
- comparator = (PojoComparator<R>) cType.createComparator(
- new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config);
- } else {
- throw new IllegalArgumentException(
- "Key expressions are only supported on POJO types. "
- + "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public F get(R record) {
- return (F) comparator.accessField(comparator.getKeyFields()[0], record);
- }
-
- @Override
- public R set(R record, F fieldValue) {
- try {
- comparator.getKeyFields()[0].set(record, fieldValue);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Could not modify the specified field.", e);
- }
- return record;
- }
- }
-
- public static class ProductFieldAccessor<R, F> extends FieldAccessor<R, F> {
-
- private static final long serialVersionUID = 1L;
-
- int pos;
- TupleSerializerBase<R> serializer;
- Object[] fields;
- int length;
-
- ProductFieldAccessor(int pos, TypeInformation<R> typeInfo, ExecutionConfig config) {
- this.pos = pos;
- this.fieldType = ((TupleTypeInfoBase<R>)typeInfo).getTypeAt(pos);
- this.serializer = (TupleSerializerBase<R>)typeInfo.createSerializer(config);
- this.length = this.serializer.getArity();
- this.fields = new Object[this.length];
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public F get(R record) {
- return (F)((Product)record).productElement(pos);
- }
-
- @Override
- public R set(R record, F fieldValue) {
- Product prod = (Product)record;
- for (int i = 0; i < length; i++) {
- fields[i] = prod.productElement(i);
- }
- fields[pos] = fieldValue;
- return serializer.createInstance(fields);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
deleted file mode 100644
index afbd8ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.keys;
-
-import java.lang.reflect.Array;
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Utility class that contains helper methods to manipulating {@link KeySelector} for streaming.
- */
-public final class KeySelectorUtil {
-
- public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
- if (!(typeInfo instanceof CompositeType)) {
- throw new InvalidTypesException(
- "This key operation requires a composite type such as Tuples, POJOs, or Case Classes.");
- }
-
- CompositeType<X> compositeType = (CompositeType<X>) typeInfo;
-
- int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
- int numKeyFields = logicalKeyPositions.length;
-
- // use ascending order here, the code paths for that are usually a slight bit faster
- boolean[] orders = new boolean[numKeyFields];
- TypeInformation<?>[] typeInfos = new TypeInformation<?>[numKeyFields];
- for (int i = 0; i < numKeyFields; i++) {
- orders[i] = true;
- typeInfos[i] = compositeType.getTypeAt(logicalKeyPositions[i]);
- }
-
- TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig);
- return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
- }
-
- public static <X> ArrayKeySelector<X> getSelectorForArray(int[] positions, TypeInformation<X> typeInfo) {
- if (positions == null || positions.length == 0 || positions.length > Tuple.MAX_ARITY) {
- throw new IllegalArgumentException("Array keys must have between 1 and " + Tuple.MAX_ARITY + " fields.");
- }
-
- TypeInformation<?> componentType;
-
- if (typeInfo instanceof BasicArrayTypeInfo) {
- BasicArrayTypeInfo<X, ?> arrayInfo = (BasicArrayTypeInfo<X, ?>) typeInfo;
- componentType = arrayInfo.getComponentInfo();
- }
- else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
- PrimitiveArrayTypeInfo<X> arrayType = (PrimitiveArrayTypeInfo<X>) typeInfo;
- componentType = arrayType.getComponentType();
- }
- else {
- throw new IllegalArgumentException("This method only supports arrays of primitives and boxed primitives.");
- }
-
- TypeInformation<?>[] primitiveInfos = new TypeInformation<?>[positions.length];
- Arrays.fill(primitiveInfos, componentType);
-
- return new ArrayKeySelector<>(positions, new TupleTypeInfo<>(primitiveInfos));
- }
-
-
- public static <X, K> KeySelector<X, K> getSelectorForOneKey(
- Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo, ExecutionConfig executionConfig)
- {
- if (!(typeInfo instanceof CompositeType)) {
- throw new InvalidTypesException(
- "This key operation requires a composite type such as Tuples, POJOs, case classes, etc");
- }
- if (partitioner != null) {
- keys.validateCustomPartitioner(partitioner, null);
- }
-
- CompositeType<X> compositeType = (CompositeType<X>) typeInfo;
- int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
- if (logicalKeyPositions.length != 1) {
- throw new IllegalArgumentException("There must be exactly 1 key specified");
- }
-
- TypeComparator<X> comparator = compositeType.createComparator(
- logicalKeyPositions, new boolean[] { true }, 0, executionConfig);
- return new OneKeySelector<>(comparator);
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Private constructor to prevent instantiation.
- */
- private KeySelectorUtil() {
- throw new RuntimeException();
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Key extractor that extracts a single field via a generic comparator.
- *
- * @param <IN> The type of the elements where the key is extracted from.
- * @param <K> The type of the key.
- */
- public static final class OneKeySelector<IN, K> implements KeySelector<IN, K> {
-
- private static final long serialVersionUID = 1L;
-
- private final TypeComparator<IN> comparator;
-
- /** Reusable array to hold the key objects. Since this is initially empty (all positions
- * are null), it does not have any serialization problems */
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
- private final Object[] keyArray;
-
- OneKeySelector(TypeComparator<IN> comparator) {
- this.comparator = comparator;
- this.keyArray = new Object[1];
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public K getKey(IN value) throws Exception {
- comparator.extractKeys(value, keyArray, 0);
- return (K) keyArray[0];
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A key selector for selecting key fields via a TypeComparator.
- *
- * @param <IN> The type from which the key is extracted.
- */
- public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {
-
- private static final long serialVersionUID = 1L;
-
- private final TypeComparator<IN> comparator;
- private final int keyLength;
- private transient TupleTypeInfo<Tuple> tupleTypeInfo;
-
- /** Reusable array to hold the key objects. Since this is initially empty (all positions
- * are null), it does not have any serialization problems */
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
- private final Object[] keyArray;
-
- ComparableKeySelector(TypeComparator<IN> comparator, int keyLength, TupleTypeInfo<Tuple> tupleTypeInfo) {
- this.comparator = comparator;
- this.keyLength = keyLength;
- this.tupleTypeInfo = tupleTypeInfo;
- this.keyArray = new Object[keyLength];
- }
-
- @Override
- public Tuple getKey(IN value) throws Exception {
- Tuple key = Tuple.getTupleClass(keyLength).newInstance();
- comparator.extractKeys(value, keyArray, 0);
- for (int i = 0; i < keyLength; i++) {
- key.setField(keyArray[i], i);
- }
- return key;
- }
-
- @Override
- public TypeInformation<Tuple> getProducedType() {
- if (tupleTypeInfo == null) {
- throw new IllegalStateException("The return type information is not available after serialization");
- }
- return tupleTypeInfo;
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A key selector for selecting individual array fields as keys and returns them as a Tuple.
- *
- * @param <IN> The type from which the key is extracted, i.e., the array type.
- */
- public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {
-
- private static final long serialVersionUID = 1L;
-
- private final int[] fields;
- private final Class<? extends Tuple> tupleClass;
- private transient TupleTypeInfo<Tuple> returnType;
-
- ArrayKeySelector(int[] fields, TupleTypeInfo<Tuple> returnType) {
- this.fields = requireNonNull(fields);
- this.returnType = requireNonNull(returnType);
- this.tupleClass = Tuple.getTupleClass(fields.length);
- }
-
- @Override
- public Tuple getKey(IN value) throws Exception {
- Tuple key = tupleClass.newInstance();
- for (int i = 0; i < fields.length; i++) {
- key.setField(Array.get(value, fields[i]), i);
- }
- return key;
- }
-
- @Override
- public TypeInformation<Tuple> getProducedType() {
- if (returnType == null) {
- throw new IllegalStateException("The return type information is not available after serialization");
- }
- return returnType;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
deleted file mode 100644
index f0e4477..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-/**
- * The deserialization schema describes how to turn the byte messages delivered by certain
- * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
- * processed by Flink.
- *
- * @param <T> The type created by the deserialization schema.
- */
-public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
-
- /**
- * Deserializes the byte message.
- *
- * @param message The message, as a byte array.
- * @return The deserialized message as an object.
- */
- T deserialize(byte[] message);
-
- /**
- * Method to decide whether the element signals the end of the stream. If
- * true is returned the element won't be emitted.
- *
- * @param nextElement The element to test for the end-of-stream signal.
- * @return True, if the element signals end of stream, false otherwise.
- */
- boolean isEndOfStream(T nextElement);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
deleted file mode 100644
index ebb785c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- return nextElement.equals("q");
- }
-
- @Override
- public byte[] serialize(String element) {
- return SerializationUtils.serialize(element);
- }
-
- @Override
- public String deserialize(byte[] message) {
- return SerializationUtils.deserialize(message);
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
deleted file mode 100644
index 4d9aaee..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * A "no-op" serialization and deserialization schema for byte strings. The serialized representation is
- * identical with the original representation.
- *
- * <p>This schema never considers a byte string to signal end-of-stream.</p>
- */
-public class RawSchema implements DeserializationSchema<byte[]>, SerializationSchema<byte[], byte[]> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public byte[] deserialize(byte[] message) {
- return message;
- }
-
- @Override
- public boolean isEndOfStream(byte[] nextElement) {
- return false;
- }
-
- @Override
- public byte[] serialize(byte[] element) {
- return element;
- }
-
- @Override
- public TypeInformation<byte[]> getProducedType() {
- return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
deleted file mode 100644
index 21342b2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-/**
- * The serialization schema describes how to turn a data object into a different serialized
- * representation. Most data sinks (for example Apache Kafka) require the data to be handed
- * to them in a specific format (for example as byte strings).
- *
- * @param <T> The type to be serialized.
- * @param <R> The serialized representation type.
- */
-public interface SerializationSchema<T, R> extends Serializable {
-
- /**
- * Serializes the incoming element to a specified type.
- *
- * @param element
- * The incoming element to be serialized
- * @return The serialized element.
- */
- R serialize(T element);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
deleted file mode 100644
index 51d2d7f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public class SimpleStringSchema implements DeserializationSchema<String>,
- SerializationSchema<String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String deserialize(byte[] message) {
- return new String(message);
- }
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- return false;
- }
-
- @Override
- public String serialize(String element) {
- return element;
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
deleted file mode 100644
index 6ff9712..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-
-/**
- * A serialization and deserialization schema that uses Flink's serialization stack to
- * transform typed from and to byte arrays.
- *
- * @param <T> The type to be serialized.
- */
-public class TypeInformationSerializationSchema<T> implements DeserializationSchema<T>, SerializationSchema<T, byte[]> {
-
- private static final long serialVersionUID = -5359448468131559102L;
-
- /** The serializer for the actual de-/serialization */
- private final TypeSerializer<T> serializer;
-
- /** The reusable output serialization buffer */
- private transient DataOutputSerializer dos;
-
- /** The type information, to be returned by {@link #getProducedType()}. It is
- * transient, because it is not serializable. Note that this means that the type information
- * is not available at runtime, but only prior to the first serialization / deserialization */
- private transient TypeInformation<T> typeInfo;
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new de-/serialization schema for the given type.
- *
- * @param typeInfo The type information for the type de-/serialized by this schema.
- * @param ec The execution config, which is used to parametrize the type serializers.
- */
- public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, ExecutionConfig ec) {
- this.typeInfo = typeInfo;
- this.serializer = typeInfo.createSerializer(ec);
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public T deserialize(byte[] message) {
- try {
- return serializer.deserialize(new ByteArrayInputView(message));
- }
- catch (IOException e) {
- throw new RuntimeException("Unable to deserialize message", e);
- }
- }
-
- /**
- * This schema never considers an element to signal end-of-stream, so this method returns always false.
- * @param nextElement The element to test for the end-of-stream signal.
- * @return Returns false.
- */
- @Override
- public boolean isEndOfStream(T nextElement) {
- return false;
- }
-
- @Override
- public byte[] serialize(T element) {
- if (dos == null) {
- dos = new DataOutputSerializer(16);
- }
-
- try {
- serializer.serialize(element, dos);
- }
- catch (IOException e) {
- throw new RuntimeException("Unable to serialize record", e);
- }
-
- byte[] ret = dos.getByteArray();
- if (ret.length != dos.length()) {
- byte[] n = new byte[dos.length()];
- System.arraycopy(ret, 0, n, 0, dos.length());
- ret = n;
- }
- dos.clear();
- return ret;
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- if (typeInfo != null) {
- return typeInfo;
- }
- else {
- throw new IllegalStateException(
- "The type information is not available after this class has been serialized and distributed.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
deleted file mode 100644
index 1187fe6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-// We have it in this package because we could not mock the methods otherwise
-package org.apache.flink.runtime.io.network.partition.consumer;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Test {@link InputGate} that allows setting multiple channels. Use
- * {@link #sendElement(Object, int)} to offer an element on a specific channel. Use
- * {@link #sendEvent(AbstractEvent, int)} to offer an event on the specified channel. Use
- * {@link #endInput()} to notify all channels of input end.
- */
-public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
-
- private final int numInputChannels;
-
- private final TestInputChannel[] inputChannels;
-
- private final int bufferSize;
-
- private TypeSerializer<T> serializer;
-
- private ConcurrentLinkedQueue<InputValue<Object>>[] inputQueues;
-
- @SuppressWarnings("unchecked")
- public StreamTestSingleInputGate(
- int numInputChannels,
- int bufferSize,
- TypeSerializer<T> serializer) throws IOException, InterruptedException {
- super(numInputChannels, false);
-
- this.bufferSize = bufferSize;
- this.serializer = serializer;
-
- this.numInputChannels = numInputChannels;
- inputChannels = new TestInputChannel[numInputChannels];
-
- inputQueues = new ConcurrentLinkedQueue[numInputChannels];
-
- setupInputChannels();
- doReturn(bufferSize).when(inputGate).getPageSize();
- }
-
- @SuppressWarnings("unchecked")
- private void setupInputChannels() throws IOException, InterruptedException {
-
- for (int i = 0; i < numInputChannels; i++) {
- final int channelIndex = i;
- final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
- final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
- new SerializationDelegate<StreamElement>(new MultiplexingStreamRecordSerializer<T>(serializer));
-
- inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
- inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
-
-
- final Answer<Buffer> answer = new Answer<Buffer>() {
- @Override
- public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
- InputValue<Object> input = inputQueues[channelIndex].poll();
- if (input != null && input.isStreamEnd()) {
- when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
- true);
- return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
- }
- else if (input != null && input.isStreamRecord()) {
- Object inputElement = input.getStreamRecord();
- final Buffer buffer = new Buffer(
- MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
- mock(BufferRecycler.class));
-
- recordSerializer.setNextBuffer(buffer);
- delegate.setInstance(inputElement);
- recordSerializer.addRecord(delegate);
-
- // Call getCurrentBuffer to ensure size is set
- return recordSerializer.getCurrentBuffer();
- }
- else if (input != null && input.isEvent()) {
- AbstractEvent event = input.getEvent();
- return EventSerializer.toBuffer(event);
- }
- else {
- synchronized (inputQueues[channelIndex]) {
- inputQueues[channelIndex].wait();
- return answer(invocationOnMock);
- }
- }
- }
- };
-
- when(inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer(answer);
-
- inputGate.setInputChannel(new IntermediateResultPartitionID(),
- inputChannels[channelIndex].getInputChannel());
- }
- }
-
- public void sendElement(Object element, int channel) {
- synchronized (inputQueues[channel]) {
- inputQueues[channel].add(InputValue.element(element));
- inputQueues[channel].notifyAll();
- }
- inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
- }
-
- public void sendEvent(AbstractEvent event, int channel) {
- synchronized (inputQueues[channel]) {
- inputQueues[channel].add(InputValue.event(event));
- inputQueues[channel].notifyAll();
- }
- inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
- }
-
- public void endInput() {
- for (int i = 0; i < numInputChannels; i++) {
- synchronized (inputQueues[i]) {
- inputQueues[i].add(InputValue.streamEnd());
- inputQueues[i].notifyAll();
- }
- inputGate.onAvailableBuffer(inputChannels[i].getInputChannel());
- }
- }
-
- /**
- * Returns true iff all input queues are empty.
- */
- public boolean allQueuesEmpty() {
-// for (int i = 0; i < numInputChannels; i++) {
-// synchronized (inputQueues[i]) {
-// inputQueues[i].add(InputValue.<T>event(new DummyEvent()));
-// inputQueues[i].notifyAll();
-// inputGate.onAvailableBuffer(inputChannels[i].getInputChannel());
-// }
-// }
-
- for (int i = 0; i < numInputChannels; i++) {
- if (inputQueues[i].size() > 0) {
- return false;
- }
- }
- return true;
- }
-
- public static class InputValue<T> {
- private Object elementOrEvent;
- private boolean isStreamEnd;
- private boolean isStreamRecord;
- private boolean isEvent;
-
- private InputValue(Object elementOrEvent, boolean isStreamEnd, boolean isEvent, boolean isStreamRecord) {
- this.elementOrEvent = elementOrEvent;
- this.isStreamEnd = isStreamEnd;
- this.isStreamRecord = isStreamRecord;
- this.isEvent = isEvent;
- }
-
- public static <X> InputValue<X> element(Object element) {
- return new InputValue<X>(element, false, false, true);
- }
-
- public static <X> InputValue<X> streamEnd() {
- return new InputValue<X>(null, true, false, false);
- }
-
- public static <X> InputValue<X> event(AbstractEvent event) {
- return new InputValue<X>(event, false, true, false);
- }
-
- public Object getStreamRecord() {
- return elementOrEvent;
- }
-
- public AbstractEvent getEvent() {
- return (AbstractEvent) elementOrEvent;
- }
-
- public boolean isStreamEnd() {
- return isStreamEnd;
- }
-
- public boolean isStreamRecord() {
- return isStreamRecord;
- }
-
- public boolean isEvent() {
- return isEvent;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
deleted file mode 100644
index dd8dec9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-
-import org.junit.Test;
-
-public class AggregationFunctionTest {
-
- @Test
- public void groupSumIntegerTest() {
-
- // preparing expected outputs
- List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
- List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
- List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();
-
- int groupedSum0 = 0;
- int groupedSum1 = 0;
- int groupedSum2 = 0;
-
- for (int i = 0; i < 9; i++) {
- int groupedSum;
- switch (i % 3) {
- case 0:
- groupedSum = groupedSum0 += i;
- break;
- case 1:
- groupedSum = groupedSum1 += i;
- break;
- default:
- groupedSum = groupedSum2 += i;
- break;
- }
-
- expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
- expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
- expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
- }
-
- // some necessary boiler plate
- TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor.getForObject(new Tuple2<>(0, 0));
-
- ExecutionConfig config = new ExecutionConfig();
-
- KeySelector<Tuple2<Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
- typeInfo, config);
- TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
-
- // aggregations tested
- ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
- new SumAggregator<>(1, typeInfo, config);
- ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
- 1, typeInfo, AggregationType.MIN, config);
- ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
- 1, typeInfo, AggregationType.MAX, config);
-
- List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
- getInputList(),
- keySelector, keyType);
-
- List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
- getInputList(),
- keySelector, keyType);
-
- List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
- getInputList(),
- keySelector, keyType);
-
- assertEquals(expectedGroupSumList, groupedSumList);
- assertEquals(expectedGroupMinList, groupedMinList);
- assertEquals(expectedGroupMaxList, groupedMaxList);
- }
-
- @Test
- public void pojoGroupSumIntegerTest() {
-
- // preparing expected outputs
- List<MyPojo> expectedGroupSumList = new ArrayList<>();
- List<MyPojo> expectedGroupMinList = new ArrayList<>();
- List<MyPojo> expectedGroupMaxList = new ArrayList<>();
-
- int groupedSum0 = 0;
- int groupedSum1 = 0;
- int groupedSum2 = 0;
-
- for (int i = 0; i < 9; i++) {
- int groupedSum;
- switch (i % 3) {
- case 0:
- groupedSum = groupedSum0 += i;
- break;
- case 1:
- groupedSum = groupedSum1 += i;
- break;
- default:
- groupedSum = groupedSum2 += i;
- break;
- }
-
- expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
- expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
- expectedGroupMaxList.add(new MyPojo(i % 3, i));
- }
-
- // some necessary boiler plate
- TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));
-
- ExecutionConfig config = new ExecutionConfig();
-
- KeySelector<MyPojo, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
- typeInfo, config);
- TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
-
- // aggregations tested
- ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
- ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
- false, config);
- ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
- false, config);
-
- List<MyPojo> groupedSumList = MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
- getInputPojoList(),
- keySelector, keyType);
-
- List<MyPojo> groupedMinList = MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
- getInputPojoList(),
- keySelector, keyType);
-
- List<MyPojo> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
- getInputPojoList(),
- keySelector, keyType);
-
- assertEquals(expectedGroupSumList, groupedSumList);
- assertEquals(expectedGroupMinList, groupedMinList);
- assertEquals(expectedGroupMaxList, groupedMaxList);
- }
-
- @Test
- public void minMaxByTest() {
- // Tuples are grouped on field 0, aggregated on field 1
-
- // preparing expected outputs
- List<Tuple3<Integer, Integer, Integer>> maxByFirstExpected = ImmutableList.of(
- Tuple3.of(0,0,0), Tuple3.of(0,1,1), Tuple3.of(0,2,2),
- Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2),
- Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2));
-
- List<Tuple3<Integer, Integer, Integer>> maxByLastExpected = ImmutableList.of(
- Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2),
- Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 5),
- Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 8));
-
- List<Tuple3<Integer, Integer, Integer>> minByFirstExpected = ImmutableList.of(
- Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
- Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
- Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0));
-
- List<Tuple3<Integer, Integer, Integer>> minByLastExpected = ImmutableList.of(
- Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0),
- Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3),
- Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6));
-
- // some necessary boiler plate
- TypeInformation<Tuple3<Integer, Integer, Integer>> typeInfo = TypeExtractor
- .getForObject(Tuple3.of(0,0,0));
-
- ExecutionConfig config = new ExecutionConfig();
-
- KeySelector<Tuple3<Integer, Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
- typeInfo, config);
- TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
-
- // aggregations tested
- ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionFirst =
- new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, true, config);
- ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionLast =
- new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, false, config);
- ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionFirst =
- new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, true, config);
- ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionLast =
- new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
-
- assertEquals(maxByFirstExpected, MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
- getInputByList(),
- keySelector, keyType));
-
- assertEquals(maxByLastExpected, MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
- getInputByList(),
- keySelector, keyType));
-
- assertEquals(minByLastExpected, MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
- getInputByList(),
- keySelector, keyType));
-
- assertEquals(minByFirstExpected, MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
- getInputByList(),
- keySelector, keyType));
- }
-
- @Test
- public void pojoMinMaxByTest() {
- // Pojos are grouped on field 0, aggregated on field 1
-
- // preparing expected outputs
- List<MyPojo3> maxByFirstExpected = ImmutableList.of(
- new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
- new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2),
- new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2));
-
- List<MyPojo3> maxByLastExpected = ImmutableList.of(
- new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
- new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 5),
- new MyPojo3(2, 5), new MyPojo3(2, 5), new MyPojo3(2, 8));
-
- List<MyPojo3> minByFirstExpected = ImmutableList.of(
- new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
- new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
- new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0));
-
- List<MyPojo3> minByLastExpected = ImmutableList.of(
- new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
- new MyPojo3(0, 3), new MyPojo3(0, 3), new MyPojo3(0, 3),
- new MyPojo3(0, 6), new MyPojo3(0, 6), new MyPojo3(0, 6));
-
- // some necessary boiler plate
- TypeInformation<MyPojo3> typeInfo = TypeExtractor.getForObject(new MyPojo3(0, 0));
-
- ExecutionConfig config = new ExecutionConfig();
-
- KeySelector<MyPojo3, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
- typeInfo, config);
- TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
-
- // aggregations tested
- ReduceFunction<MyPojo3> maxByFunctionFirst =
- new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, true, config);
- ReduceFunction<MyPojo3> maxByFunctionLast =
- new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, false, config);
- ReduceFunction<MyPojo3> minByFunctionFirst =
- new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, true, config);
- ReduceFunction<MyPojo3> minByFunctionLast =
- new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
-
- assertEquals(maxByFirstExpected, MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
- getInputByPojoList(),
- keySelector, keyType));
-
- assertEquals(maxByLastExpected, MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
- getInputByPojoList(),
- keySelector, keyType));
-
- assertEquals(minByLastExpected, MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
- getInputByPojoList(),
- keySelector, keyType));
-
- assertEquals(minByFirstExpected, MockContext.createAndExecuteForKeyedStream(
- new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
- getInputByPojoList(),
- keySelector, keyType));
- }
-
- // *************************************************************************
- // UTILS
- // *************************************************************************
-
- private List<Tuple2<Integer, Integer>> getInputList() {
- ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<>();
- for (int i = 0; i < 9; i++) {
- inputList.add(Tuple2.of(i % 3, i));
- }
- return inputList;
- }
-
- private List<MyPojo> getInputPojoList() {
- ArrayList<MyPojo> inputList = new ArrayList<>();
- for (int i = 0; i < 9; i++) {
- inputList.add(new MyPojo(i % 3, i));
- }
- return inputList;
- }
-
- private List<Tuple3<Integer, Integer, Integer>> getInputByList() {
- ArrayList<Tuple3<Integer, Integer, Integer>> inputList = new ArrayList<>();
- for (int i = 0; i < 9; i++) {
- inputList.add(Tuple3.of(0, i % 3, i));
- }
- return inputList;
- }
-
- private List<MyPojo3> getInputByPojoList() {
- ArrayList<MyPojo3> inputList = new ArrayList<>();
- for (int i = 0; i < 9; i++) {
- inputList.add(new MyPojo3(i % 3, i));
- }
- return inputList;
- }
-
- public static class MyPojo implements Serializable {
-
- private static final long serialVersionUID = 1L;
- public int f0;
- public int f1;
-
- public MyPojo(int f0, int f1) {
- this.f0 = f0;
- this.f1 = f1;
- }
-
- public MyPojo() {
- }
-
- @Override
- public String toString() {
- return "POJO(" + f0 + "," + f1 + ")";
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof MyPojo) {
- return this.f0 == ((MyPojo) other).f0 && this.f1 == ((MyPojo) other).f1;
- } else {
- return false;
- }
- }
- }
-
- public static class MyPojo3 implements Serializable {
-
- private static final long serialVersionUID = 1L;
- public int f0;
- public int f1;
- public int f2;
-
- // Field 0 is always initialized to 0
- public MyPojo3(int f1, int f2) {
- this.f1 = f1;
- this.f2 = f2;
- }
-
- public MyPojo3() {
- }
-
- @Override
- public String toString() {
- return "POJO3(" + f0 + "," + f1 + "," + f2 + ")";
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof MyPojo3) {
- return this.f0 == ((MyPojo3) other).f0
- && this.f1 == ((MyPojo3) other).f1
- && this.f2 == ((MyPojo3) other).f2;
- } else {
- return false;
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
deleted file mode 100644
index 68a047c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class ChainedRuntimeContextTest extends StreamingMultipleProgramsTestBase {
- private static RuntimeContext srcContext;
- private static RuntimeContext mapContext;
-
- @Test
- public void test() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- env.addSource(new TestSource()).map(new TestMap()).addSink(new NoOpSink<Integer>());
- env.execute();
-
- assertNotEquals(srcContext, mapContext);
-
- }
-
- private static class TestSource extends RichParallelSourceFunction<Integer> {
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- }
-
- @Override
- public void cancel() {
- }
-
- @Override
- public void open(Configuration c) {
- srcContext = getRuntimeContext();
- }
-
- }
-
- private static class TestMap extends RichMapFunction<Integer, Integer> {
-
- @Override
- public Integer map(Integer value) throws Exception {
- return value;
- }
-
- @Override
- public void open(Configuration c) {
- mapContext = getRuntimeContext();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
deleted file mode 100644
index 0f9cbe9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoStreamTest extends StreamingMultipleProgramsTestBase {
-
- private static ArrayList<String> expected = new ArrayList<String>();
-
- @Test
- public void test() {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
- DataStream<Integer> src = env.fromElements(1, 3, 5);
-
- DataStream<Integer> filter1 = src.filter(new FilterFunction<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Integer value) throws Exception {
- return true;
- }
- }).keyBy(new KeySelector<Integer, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
- });
-
- DataStream<Tuple2<Integer, Integer>> filter2 = src
- .map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Integer> map(Integer value) throws Exception {
- return new Tuple2<Integer, Integer>(value, value + 1);
- }
- })
- .rebalance()
- .filter(new FilterFunction<Tuple2<Integer, Integer>>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
- return true;
- }
- }).disableChaining().keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
- return value.f0;
- }
- });
-
- DataStream<String> connected = filter1.connect(filter2).flatMap(new CoFlatMapFunction<Integer, Tuple2<Integer, Integer>, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap1(Integer value, Collector<String> out) throws Exception {
- out.collect(value.toString());
- }
-
- @Override
- public void flatMap2(Tuple2<Integer, Integer> value, Collector<String> out) throws Exception {
- out.collect(value.toString());
- }
- });
-
- connected.addSink(resultSink);
-
- try {
- env.execute();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- expected = new ArrayList<String>();
- expected.addAll(Arrays.asList("(1,2)", "(3,4)", "(5,6)", "1", "3", "5"));
-
- List<String> result = resultSink.getResult();
- Collections.sort(result);
-
- assertEquals(expected, result);
- }
-}