You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sbcd90 <gi...@git.apache.org> on 2016/02/01 03:25:43 UTC

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

GitHub user sbcd90 opened a pull request:

    https://github.com/apache/flink/pull/1566

    [FLINK-2678]DataSet API does not support multi-dimensional arrays as keys

    Hello,
    
    @tillrohrmann I have added support for multi-dimensional arrays as keys in Dataset api. Please review & merge.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sbcd90/flink FLINK-2678

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1566.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1566
    
----
commit ec0846c64c1143dbddabc69df984dbbe64110ca6
Author: Subhobrata Dey <sb...@gmail.com>
Date:   2016-02-01T01:50:43Z

    [FLINK-2678]DataSet API does not support multi-dimensional arrays as keys

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51551151
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparator.java ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Array;
    +import java.util.Arrays;
    +
    +public class GenericArrayComparator<T> extends TypeComparator<T[]> implements java.io.Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private transient T[] reference;
    +
    +	protected final boolean ascendingComparison;
    +
    +	private final TypeSerializer<T[]> serializer;
    +
    +	// For use by getComparators
    +	@SuppressWarnings("rawtypes")
    +	private final TypeComparator[] comparators = new TypeComparator[] {this};
    +
    +	public GenericArrayComparator(boolean ascending, TypeSerializer<T[]> serializer) {
    +		this.ascendingComparison = ascending;
    +		this.serializer = serializer;
    +	}
    +
    +	@Override
    +	public void setReference(T[] reference) {
    +		this.reference = reference;
    +	}
    +
    +	@Override
    +	public boolean equalToReference(T[] candidate) {
    +		return compare(this.reference, candidate) == 0;
    +	}
    +
    +	@Override
    +	public int compareToReference(TypeComparator<T[]> referencedComparator) {
    +		int comp = compare(((GenericArrayComparator<T>) referencedComparator).reference, reference);
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	@Override
    +	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
    +		T[] firstArray = serializer.deserialize(firstSource);
    +		T[] secondArray = serializer.deserialize(secondSource);
    +
    +		int comp = compare(firstArray, secondArray);
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	@Override
    +	public int extractKeys(Object record, Object[] target, int index) {
    +		target[index] = record;
    +		return 1;
    +	}
    +
    +	@Override
    +	public TypeComparator[] getFlatComparators() {
    +		return comparators;
    +	}
    +
    +	@Override
    +	public boolean supportsNormalizedKey() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean supportsSerializationWithKeyNormalization() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getNormalizeKeyLen() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void putNormalizedKey(T[] record, MemorySegment target, int offset, int numBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void writeWithKeyNormalization(T[] record, DataOutputView target) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public T[] readWithKeyDenormalization(T[] reuse, DataInputView source) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public boolean invertNormalizedKey() {
    +		return !ascendingComparison;
    +	}
    +
    +	@Override
    +	public int hash(T[] record) {
    +		return Arrays.hashCode(record);
    +	}
    +
    +	private int compareValues(Object first, Object second) {
    +		if (first.getClass().equals(Boolean.class) && second.getClass().equals(Boolean.class)) {
    +			return new BooleanComparator(true).compare((Boolean) first, (Boolean) second);
    +		}
    +		else if (first.getClass().equals(Byte.class) && second.getClass().equals(Byte.class)) {
    +			return new ByteComparator(true).compare((Byte) first, (Byte) second);
    +		}
    +		else if (first.getClass().equals(Character.class) && second.getClass().equals(Character.class)) {
    +			return new CharComparator(true).compare((Character) first, (Character) second);
    +		}
    +		else if (first.getClass().equals(Double.class) && second.getClass().equals(Double.class)) {
    +			return new DoubleComparator(true).compare((Double) first, (Double) second);
    +		}
    +		else if (first.getClass().equals(Float.class) && second.getClass().equals(Float.class)) {
    +			return new FloatComparator(true).compare((Float) first, (Float) second);
    +		}
    +		else if (first.getClass().equals(Integer.class) && second.getClass().equals(Integer.class)) {
    +			return new IntComparator(true).compare((Integer) first, (Integer) second);
    +		}
    +		else if (first.getClass().equals(Long.class) && second.getClass().equals(Long.class)) {
    +			return new LongComparator(true).compare((Long) first, (Long) second);
    +		}
    +		else if (first.getClass().equals(Short.class) && second.getClass().equals(Short.class)) {
    +			return new ShortComparator(true).compare((Short) first, (Short) second);
    +		}
    +		else if (first.getClass().equals(String.class) && second.getClass().equals(String.class)) {
    +			return new StringComparator(true).compare((String) first, (String) second);
    +		}
    +		return -1;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private int parseGenericArray(Object firstArray, Object secondArray) {
    +		int compareResult = 0;
    +		if (firstArray.getClass().isArray() && secondArray.getClass().isArray()) {
    +			int min = Array.getLength(firstArray);
    +			int tempResult = 0;
    +
    +			if (min < Array.getLength(secondArray)) {
    +				tempResult = -1;
    +			}
    +			if (min > Array.getLength(secondArray)) {
    +				min = Array.getLength(secondArray);
    +				tempResult = 1;
    +			}
    +
    +			for(int i=0; i < min; i++) {
    +				int val = parseGenericArray(Array.get(firstArray, i), Array.get(secondArray, i));
    +				if (val != 0 && compareResult == 0) {
    +					compareResult = val;
    +				}
    +			}
    +
    +			if (compareResult == 0) {
    +				compareResult = tempResult;
    +			}
    +		}
    +		else {
    +			compareResult = compareValues(firstArray, secondArray);
    +		}
    +		return compareResult;
    +	}
    +
    +	@Override
    +	public int compare(T[] first, T[] second) {
    --- End diff --
    
    You should check whether `componentInfo` is an `AtomicType` or a `ComponentType` and then create the comparators via the provided methods. You can take a look at the `ComponentType.createComparator` implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the pull request:

    https://github.com/apache/flink/pull/1566#issuecomment-178174969
  
    @tillrohrmann Thanks for the review. I had a few questions based on your comments. Kindly help me in getting the questions answered so that I can proceed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 closed the pull request at:

    https://github.com/apache/flink/pull/1566


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51453680
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparatorTest.java ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.ComparatorTestBase;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.junit.Assert;
    +
    +public class GenericArrayComparatorTest extends ComparatorTestBase<char[][]> {
    --- End diff --
    
    Non primitive types should be tested as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51997602
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ObjectArrayComparatorCompositeTypeTest.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.ComparatorTestBase;
    +import org.apache.flink.api.common.typeutils.CompositeType;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.junit.Assert;
    +
    +import java.lang.reflect.Array;
    +
    +public class ObjectArrayComparatorCompositeTypeTest extends ComparatorTestBase<Tuple2<String, Integer>[][]> {
    +	private final TypeInformation<Tuple2<String, Integer>[]> componentInfo;
    +
    +	public ObjectArrayComparatorCompositeTypeTest() {
    +		this.componentInfo = ObjectArrayTypeInfo.getInfoFor(new TupleTypeInfo<Tuple>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	protected TypeSerializer<Tuple2<String, Integer>[][]> createSerializer() {
    +		return (TypeSerializer<Tuple2<String, Integer>[][]>) new GenericArraySerializer<Tuple2<String, Integer>[]>(
    +			componentInfo.getTypeClass(),
    +			componentInfo.createSerializer(null));
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	protected TypeComparator<Tuple2<String, Integer>[][]> createComparator(boolean ascending) {
    +		CompositeType<? extends Object> baseComponentInfo = new TupleTypeInfo<Tuple>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
    +		int componentArity = baseComponentInfo.getArity();
    +		int [] logicalKeyFields = new int[componentArity];
    +		boolean[] orders = new boolean[componentArity];
    +
    +		for (int i=0;i < componentArity;i++) {
    +			logicalKeyFields[i] = i;
    +			orders[i] = ascending;
    +		}
    +
    +		return (TypeComparator<Tuple2<String, Integer>[][]>) new ObjectArrayComparator<Tuple2<String, Integer>[], Character>(ascending,
    +			(GenericArraySerializer<Tuple2<String, Integer>[]>) createSerializer(),
    +			((CompositeType<? super Object>) baseComponentInfo).createComparator(logicalKeyFields, orders, 0, null)
    +		);
    +	}
    +
    +	@Override
    +	protected void deepEquals(String message, Tuple2<String, Integer>[][] should, Tuple2<String, Integer>[][] is) {
    +		Assert.assertTrue(should.length==is.length);
    +		for (int i=0;i < should.length;i++) {
    +			Assert.assertTrue(should[i].length==is[i].length);
    +			for (int j=0;j < should[i].length;j++) {
    +				Assert.assertEquals(should[i][j].f0,is[i][j].f0);
    +				Assert.assertEquals(should[i][j].f1,is[i][j].f1);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	protected Tuple2<String, Integer>[][][] getSortedTestData() {
    +		Object result = Array.newInstance(Tuple2.class, new int[]{2, 2, 1});
    +
    +		((Tuple2<String, Integer>[][][]) result)[0][0][0] = new Tuple2<String, Integer>();
    +		((Tuple2<String, Integer>[][][]) result)[0][0][0].f0 = "be";
    +		((Tuple2<String, Integer>[][][]) result)[0][0][0].f1 = 2;
    +
    +		((Tuple2<String, Integer>[][][]) result)[0][1][0] = new Tuple2<String, Integer>();
    --- End diff --
    
    you can write something like `Tuple2.of("not", 3)`. Makes your life easier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51476173
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparator.java ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Array;
    +import java.util.Arrays;
    +
    +public class GenericArrayComparator<T> extends TypeComparator<T[]> implements java.io.Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private transient T[] reference;
    +
    +	protected final boolean ascendingComparison;
    +
    +	private final TypeSerializer<T[]> serializer;
    +
    +	// For use by getComparators
    +	@SuppressWarnings("rawtypes")
    +	private final TypeComparator[] comparators = new TypeComparator[] {this};
    +
    +	public GenericArrayComparator(boolean ascending, TypeSerializer<T[]> serializer) {
    +		this.ascendingComparison = ascending;
    +		this.serializer = serializer;
    +	}
    +
    +	@Override
    +	public void setReference(T[] reference) {
    +		this.reference = reference;
    +	}
    +
    +	@Override
    +	public boolean equalToReference(T[] candidate) {
    +		return compare(this.reference, candidate) == 0;
    +	}
    +
    +	@Override
    +	public int compareToReference(TypeComparator<T[]> referencedComparator) {
    +		int comp = compare(((GenericArrayComparator<T>) referencedComparator).reference, reference);
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	@Override
    +	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
    +		T[] firstArray = serializer.deserialize(firstSource);
    +		T[] secondArray = serializer.deserialize(secondSource);
    +
    +		int comp = compare(firstArray, secondArray);
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	@Override
    +	public int extractKeys(Object record, Object[] target, int index) {
    +		target[index] = record;
    +		return 1;
    +	}
    +
    +	@Override
    +	public TypeComparator[] getFlatComparators() {
    +		return comparators;
    +	}
    +
    +	@Override
    +	public boolean supportsNormalizedKey() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean supportsSerializationWithKeyNormalization() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getNormalizeKeyLen() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void putNormalizedKey(T[] record, MemorySegment target, int offset, int numBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void writeWithKeyNormalization(T[] record, DataOutputView target) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public T[] readWithKeyDenormalization(T[] reuse, DataInputView source) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public boolean invertNormalizedKey() {
    +		return !ascendingComparison;
    +	}
    +
    +	@Override
    +	public int hash(T[] record) {
    +		return Arrays.hashCode(record);
    +	}
    +
    +	private int compareValues(Object first, Object second) {
    +		if (first.getClass().equals(Boolean.class) && second.getClass().equals(Boolean.class)) {
    +			return new BooleanComparator(true).compare((Boolean) first, (Boolean) second);
    +		}
    +		else if (first.getClass().equals(Byte.class) && second.getClass().equals(Byte.class)) {
    +			return new ByteComparator(true).compare((Byte) first, (Byte) second);
    +		}
    +		else if (first.getClass().equals(Character.class) && second.getClass().equals(Character.class)) {
    +			return new CharComparator(true).compare((Character) first, (Character) second);
    +		}
    +		else if (first.getClass().equals(Double.class) && second.getClass().equals(Double.class)) {
    +			return new DoubleComparator(true).compare((Double) first, (Double) second);
    +		}
    +		else if (first.getClass().equals(Float.class) && second.getClass().equals(Float.class)) {
    +			return new FloatComparator(true).compare((Float) first, (Float) second);
    +		}
    +		else if (first.getClass().equals(Integer.class) && second.getClass().equals(Integer.class)) {
    +			return new IntComparator(true).compare((Integer) first, (Integer) second);
    +		}
    +		else if (first.getClass().equals(Long.class) && second.getClass().equals(Long.class)) {
    +			return new LongComparator(true).compare((Long) first, (Long) second);
    +		}
    +		else if (first.getClass().equals(Short.class) && second.getClass().equals(Short.class)) {
    +			return new ShortComparator(true).compare((Short) first, (Short) second);
    +		}
    +		else if (first.getClass().equals(String.class) && second.getClass().equals(String.class)) {
    +			return new StringComparator(true).compare((String) first, (String) second);
    +		}
    +		return -1;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private int parseGenericArray(Object firstArray, Object secondArray) {
    +		int compareResult = 0;
    +		if (firstArray.getClass().isArray() && secondArray.getClass().isArray()) {
    +			int min = Array.getLength(firstArray);
    +			int tempResult = 0;
    +
    +			if (min < Array.getLength(secondArray)) {
    +				tempResult = -1;
    +			}
    +			if (min > Array.getLength(secondArray)) {
    +				min = Array.getLength(secondArray);
    +				tempResult = 1;
    +			}
    +
    +			for(int i=0; i < min; i++) {
    +				int val = parseGenericArray(Array.get(firstArray, i), Array.get(secondArray, i));
    +				if (val != 0 && compareResult == 0) {
    +					compareResult = val;
    +				}
    +			}
    +
    +			if (compareResult == 0) {
    +				compareResult = tempResult;
    +			}
    +		}
    +		else {
    +			compareResult = compareValues(firstArray, secondArray);
    +		}
    +		return compareResult;
    +	}
    +
    +	@Override
    +	public int compare(T[] first, T[] second) {
    --- End diff --
    
    `ObjectArrayTypeInfo.componentInfo` is of type `TypeInformation<T>` & accessing a method like `createComparator` from one of the implementing classes of `TypeInformation<T>` is difficult. One possible solution is to have a static map like `TYPES` which maps classes to comparator classes. From `ObjectArrayTypeInfo.componentInfo` I get the typeClass & access the corresponding comparator from the map. Is there any other better solution?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1566#issuecomment-178097783
  
    Thanks for your contribution @sbcd90. I think the PR needs still a bit of work before it can be merged. The implementation of `GenericArrayComparator` only works for primitive types. Furthermore, I think that it is easier to construct the comparator out of the component type information of the `ObjectArrayTypeInfo` instead of doing the type analysis again every time you call `compare`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the pull request:

    https://github.com/apache/flink/pull/1566#issuecomment-178034715
  
    @StephanEwen could you also help reviewing the code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51453894
  
    --- Diff: flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java ---
    @@ -311,7 +311,7 @@ public void testPojoSingleOrderFull() {
     			.sortLocalOutput("*", Order.ASCENDING);
     	}
     
    -	@Test(expected = InvalidProgramException.class)
    +	@Test
     	public void testArrayOrderFull() {
    --- End diff --
    
    That is not how you fix a test. When the behaviour of the program changes, then one should also adapt the test accordingly. E.g. checking that the order of the elements is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1566#issuecomment-180284317
  
    I think the PR is still not in a good shape. I pointed out the problematic code sections.
    
    Also please bear in mind that people will always try to review your code as soon as possible. But sometimes people are just busy and it takes some time for them to review a PR. In such a case, one simply has to be a little bit patient with them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51476344
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparatorTest.java ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.ComparatorTestBase;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.junit.Assert;
    +
    +public class GenericArrayComparatorTest extends ComparatorTestBase<char[][]> {
    --- End diff --
    
    Should another test class be created for testing non-primitive types?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the pull request:

    https://github.com/apache/flink/pull/1566#issuecomment-179599473
  
    Hello @tillrohrmann  ..I have made all the changes you have mentioned.
    
    - support for CompositeType.
    - added test cases for both primitive type & composite type
    - modified the changed test case in DataSinkTest.java
    - changed the name of Comparator to ObjectArrayComparator.java
    - javadocs are auto-generated by intellij.
    
    Please please please please review my code now. Kindly let me know if there are any further changes required. Kindly let me know if this can be merged now also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51997124
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java ---
    @@ -72,15 +77,59 @@ public int getTotalFields() {
     
     	@Override
     	public boolean isKeyType() {
    -		return false;
    +		return true;
     	}
     
     	@SuppressWarnings("unchecked")
     	@Override
     	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
     		return (TypeSerializer<T>) new GenericArraySerializer<C>(
    -			componentInfo.getTypeClass(),
    -			componentInfo.createSerializer(executionConfig));
    +				componentInfo.getTypeClass(),
    +				componentInfo.createSerializer(executionConfig));
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private TypeComparator<? super Object> getBaseComparatorInfo(TypeInformation<? extends Object> componentInfo, boolean sortOrderAscending, ExecutionConfig executionConfig) {
    --- End diff --
    
    Why are you extracting for certain types the element comparator (for example the ObjectArrayTypeInfo) and for other you create the `TypeComparator` (for example the `CompositeTypeInfo`)? I don't get it. Why do you need the `getBaseComparatorInfo` method at all? Simply check in `createComparator` the different subtypes and then create the `TypeComparator`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51997240
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java ---
    @@ -72,15 +77,59 @@ public int getTotalFields() {
     
     	@Override
     	public boolean isKeyType() {
    -		return false;
    +		return true;
     	}
     
     	@SuppressWarnings("unchecked")
     	@Override
     	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
     		return (TypeSerializer<T>) new GenericArraySerializer<C>(
    -			componentInfo.getTypeClass(),
    -			componentInfo.createSerializer(executionConfig));
    +				componentInfo.getTypeClass(),
    +				componentInfo.createSerializer(executionConfig));
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private TypeComparator<? super Object> getBaseComparatorInfo(TypeInformation<? extends Object> componentInfo, boolean sortOrderAscending, ExecutionConfig executionConfig) {
    +		/**
    +		 * method tries to find out the Comparator to be used to compare each element (of primitive type or composite type) of the provided Object arrays.
    +		 */
    +		if (componentInfo instanceof ObjectArrayTypeInfo) {
    +			return getBaseComparatorInfo(((ObjectArrayTypeInfo) componentInfo).getComponentInfo(), sortOrderAscending, executionConfig);
    +		}
    +		else if (componentInfo instanceof PrimitiveArrayTypeInfo) {
    +			return getBaseComparatorInfo(((PrimitiveArrayTypeInfo<? extends Object>) componentInfo).getComponentType(), sortOrderAscending, executionConfig);
    +		}
    +		else {
    +			if (componentInfo instanceof AtomicType) {
    +				return ((AtomicType<? super Object>) componentInfo).createComparator(sortOrderAscending, executionConfig);
    +			}
    +			else if (componentInfo instanceof CompositeType) {
    +				int componentArity = ((CompositeType<? extends Object>) componentInfo).getArity();
    +				int [] logicalKeyFields = new int[componentArity];
    +				boolean[] orders = new boolean[componentArity];
    +
    +				for (int i=0;i < componentArity;i++) {
    +					logicalKeyFields[i] = i;
    +					orders[i] = sortOrderAscending;
    +				}
    +
    +				return ((CompositeType<? super Object>) componentInfo).createComparator(logicalKeyFields, orders, 0, executionConfig);
    +			}
    +			else {
    +				throw new IllegalArgumentException("Could not add a comparator for the component type " + componentInfo.getClass().getName());
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
    +
    +		return (TypeComparator<T>) new ObjectArrayComparator<T,C>(
    +			sortOrderAscending,
    +			(GenericArraySerializer<T>) createSerializer(executionConfig),
    --- End diff --
    
    Why this cast here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51448517
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparator.java ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Array;
    +import java.util.Arrays;
    +
    +public class GenericArrayComparator<T> extends TypeComparator<T[]> implements java.io.Serializable {
    --- End diff --
    
    Java docs missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51453735
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparator.java ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Array;
    +import java.util.Arrays;
    +
    +public class GenericArrayComparator<T> extends TypeComparator<T[]> implements java.io.Serializable {
    --- End diff --
    
    Why not calling it `ObjectArrayComparator`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51997409
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ObjectArrayComparator.java ---
    @@ -0,0 +1,199 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Array;
    +import java.util.Arrays;
    +
    +
    +public class ObjectArrayComparator<T,C> extends TypeComparator<T[]> implements java.io.Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private transient T[] reference;
    +
    +	protected final boolean ascendingComparison;
    +
    +	private final TypeSerializer<T[]> serializer;
    +
    +	private TypeComparator<? super Object> comparatorInfo;
    +
    +	// For use by getComparators
    +	@SuppressWarnings("rawtypes")
    +	private final TypeComparator[] comparators = new TypeComparator[] {this};
    +
    +	public ObjectArrayComparator(boolean ascending, TypeSerializer<T[]> serializer, TypeComparator<? super Object> comparatorInfo) {
    +		this.ascendingComparison = ascending;
    +		this.serializer = serializer;
    +		this.comparatorInfo = comparatorInfo;
    +	}
    +
    +	@Override
    +	public void setReference(T[] reference) {
    +		this.reference = reference;
    +	}
    +
    +	@Override
    +	public boolean equalToReference(T[] candidate) {
    +		return compare(this.reference, candidate) == 0;
    +	}
    +
    +	@Override
    +	public int compareToReference(TypeComparator<T[]> referencedComparator) {
    +		int comp = compare(((ObjectArrayComparator<T,C>) referencedComparator).reference, reference);
    +		return comp;
    +	}
    +
    +	@Override
    +	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
    +		T[] firstArray = serializer.deserialize(firstSource);
    +		T[] secondArray = serializer.deserialize(secondSource);
    +
    +		int comp = compare(firstArray, secondArray);
    +		return comp;
    +	}
    +
    +	@Override
    +	public int extractKeys(Object record, Object[] target, int index) {
    +		target[index] = record;
    +		return 1;
    +	}
    +
    +	@Override
    +	public TypeComparator[] getFlatComparators() {
    +		return comparators;
    +	}
    +
    +	@Override
    +	public boolean supportsNormalizedKey() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean supportsSerializationWithKeyNormalization() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getNormalizeKeyLen() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void putNormalizedKey(T[] record, MemorySegment target, int offset, int numBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void writeWithKeyNormalization(T[] record, DataOutputView target) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public T[] readWithKeyDenormalization(T[] reuse, DataInputView source) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public boolean invertNormalizedKey() {
    +		return !ascendingComparison;
    +	}
    +
    +	@Override
    +	public int hash(T[] record) {
    +		return Arrays.hashCode(record);
    +	}
    +
    +	private int compareValues(Object first, Object second) {
    +		/**
    +		 * uses the chosen comparator ( of primitive or composite type ) & compares the provided objects as input
    +		 */
    +		return comparatorInfo.compare(first, second);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private int parseGenericArray(Object firstArray, Object secondArray) {
    --- End diff --
    
    Why exactly are you doing all the array operation on `Objects`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51453634
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparator.java ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Array;
    +import java.util.Arrays;
    +
    +public class GenericArrayComparator<T> extends TypeComparator<T[]> implements java.io.Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private transient T[] reference;
    +
    +	protected final boolean ascendingComparison;
    +
    +	private final TypeSerializer<T[]> serializer;
    +
    +	// For use by getComparators
    +	@SuppressWarnings("rawtypes")
    +	private final TypeComparator[] comparators = new TypeComparator[] {this};
    +
    +	public GenericArrayComparator(boolean ascending, TypeSerializer<T[]> serializer) {
    +		this.ascendingComparison = ascending;
    +		this.serializer = serializer;
    +	}
    +
    +	@Override
    +	public void setReference(T[] reference) {
    +		this.reference = reference;
    +	}
    +
    +	@Override
    +	public boolean equalToReference(T[] candidate) {
    +		return compare(this.reference, candidate) == 0;
    +	}
    +
    +	@Override
    +	public int compareToReference(TypeComparator<T[]> referencedComparator) {
    +		int comp = compare(((GenericArrayComparator<T>) referencedComparator).reference, reference);
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	@Override
    +	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
    +		T[] firstArray = serializer.deserialize(firstSource);
    +		T[] secondArray = serializer.deserialize(secondSource);
    +
    +		int comp = compare(firstArray, secondArray);
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	@Override
    +	public int extractKeys(Object record, Object[] target, int index) {
    +		target[index] = record;
    +		return 1;
    +	}
    +
    +	@Override
    +	public TypeComparator[] getFlatComparators() {
    +		return comparators;
    +	}
    +
    +	@Override
    +	public boolean supportsNormalizedKey() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean supportsSerializationWithKeyNormalization() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getNormalizeKeyLen() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void putNormalizedKey(T[] record, MemorySegment target, int offset, int numBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void writeWithKeyNormalization(T[] record, DataOutputView target) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public T[] readWithKeyDenormalization(T[] reuse, DataInputView source) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public boolean invertNormalizedKey() {
    +		return !ascendingComparison;
    +	}
    +
    +	@Override
    +	public int hash(T[] record) {
    +		return Arrays.hashCode(record);
    +	}
    +
    +	private int compareValues(Object first, Object second) {
    +		if (first.getClass().equals(Boolean.class) && second.getClass().equals(Boolean.class)) {
    +			return new BooleanComparator(true).compare((Boolean) first, (Boolean) second);
    +		}
    +		else if (first.getClass().equals(Byte.class) && second.getClass().equals(Byte.class)) {
    +			return new ByteComparator(true).compare((Byte) first, (Byte) second);
    +		}
    +		else if (first.getClass().equals(Character.class) && second.getClass().equals(Character.class)) {
    +			return new CharComparator(true).compare((Character) first, (Character) second);
    +		}
    +		else if (first.getClass().equals(Double.class) && second.getClass().equals(Double.class)) {
    +			return new DoubleComparator(true).compare((Double) first, (Double) second);
    +		}
    +		else if (first.getClass().equals(Float.class) && second.getClass().equals(Float.class)) {
    +			return new FloatComparator(true).compare((Float) first, (Float) second);
    +		}
    +		else if (first.getClass().equals(Integer.class) && second.getClass().equals(Integer.class)) {
    +			return new IntComparator(true).compare((Integer) first, (Integer) second);
    +		}
    +		else if (first.getClass().equals(Long.class) && second.getClass().equals(Long.class)) {
    +			return new LongComparator(true).compare((Long) first, (Long) second);
    +		}
    +		else if (first.getClass().equals(Short.class) && second.getClass().equals(Short.class)) {
    +			return new ShortComparator(true).compare((Short) first, (Short) second);
    +		}
    +		else if (first.getClass().equals(String.class) && second.getClass().equals(String.class)) {
    +			return new StringComparator(true).compare((String) first, (String) second);
    +		}
    +		return -1;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private int parseGenericArray(Object firstArray, Object secondArray) {
    +		int compareResult = 0;
    +		if (firstArray.getClass().isArray() && secondArray.getClass().isArray()) {
    +			int min = Array.getLength(firstArray);
    +			int tempResult = 0;
    +
    +			if (min < Array.getLength(secondArray)) {
    +				tempResult = -1;
    +			}
    +			if (min > Array.getLength(secondArray)) {
    +				min = Array.getLength(secondArray);
    +				tempResult = 1;
    +			}
    +
    +			for(int i=0; i < min; i++) {
    +				int val = parseGenericArray(Array.get(firstArray, i), Array.get(secondArray, i));
    +				if (val != 0 && compareResult == 0) {
    +					compareResult = val;
    +				}
    +			}
    +
    +			if (compareResult == 0) {
    +				compareResult = tempResult;
    +			}
    +		}
    +		else {
    +			compareResult = compareValues(firstArray, secondArray);
    +		}
    +		return compareResult;
    +	}
    +
    +	@Override
    +	public int compare(T[] first, T[] second) {
    --- End diff --
    
    I think the compare method should only check that the length of `first` and `second` are the same and then apply the component comparator (derived from `ObjectArrayTypeInfo.componentInfo`) on the elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51997479
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ObjectArrayComparator.java ---
    @@ -0,0 +1,199 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Array;
    +import java.util.Arrays;
    +
    +
    +public class ObjectArrayComparator<T,C> extends TypeComparator<T[]> implements java.io.Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private transient T[] reference;
    +
    +	protected final boolean ascendingComparison;
    +
    +	private final TypeSerializer<T[]> serializer;
    +
    +	private TypeComparator<? super Object> comparatorInfo;
    +
    +	// For use by getComparators
    +	@SuppressWarnings("rawtypes")
    +	private final TypeComparator[] comparators = new TypeComparator[] {this};
    +
    +	public ObjectArrayComparator(boolean ascending, TypeSerializer<T[]> serializer, TypeComparator<? super Object> comparatorInfo) {
    +		this.ascendingComparison = ascending;
    +		this.serializer = serializer;
    +		this.comparatorInfo = comparatorInfo;
    +	}
    +
    +	@Override
    +	public void setReference(T[] reference) {
    +		this.reference = reference;
    +	}
    +
    +	@Override
    +	public boolean equalToReference(T[] candidate) {
    +		return compare(this.reference, candidate) == 0;
    +	}
    +
    +	@Override
    +	public int compareToReference(TypeComparator<T[]> referencedComparator) {
    +		int comp = compare(((ObjectArrayComparator<T,C>) referencedComparator).reference, reference);
    +		return comp;
    +	}
    +
    +	@Override
    +	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
    +		T[] firstArray = serializer.deserialize(firstSource);
    +		T[] secondArray = serializer.deserialize(secondSource);
    +
    +		int comp = compare(firstArray, secondArray);
    +		return comp;
    +	}
    +
    +	@Override
    +	public int extractKeys(Object record, Object[] target, int index) {
    +		target[index] = record;
    +		return 1;
    +	}
    +
    +	@Override
    +	public TypeComparator[] getFlatComparators() {
    +		return comparators;
    +	}
    +
    +	@Override
    +	public boolean supportsNormalizedKey() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean supportsSerializationWithKeyNormalization() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getNormalizeKeyLen() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void putNormalizedKey(T[] record, MemorySegment target, int offset, int numBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void writeWithKeyNormalization(T[] record, DataOutputView target) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public T[] readWithKeyDenormalization(T[] reuse, DataInputView source) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public boolean invertNormalizedKey() {
    +		return !ascendingComparison;
    +	}
    +
    +	@Override
    +	public int hash(T[] record) {
    +		return Arrays.hashCode(record);
    +	}
    +
    +	private int compareValues(Object first, Object second) {
    +		/**
    +		 * uses the chosen comparator ( of primitive or composite type ) & compares the provided objects as input
    +		 */
    +		return comparatorInfo.compare(first, second);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private int parseGenericArray(Object firstArray, Object secondArray) {
    --- End diff --
    
    I think this is not the right way to go. Simply check then length of the arrays in the `compare` method and then call for all the elements the type comparator of the element type of the array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the pull request:

    https://github.com/apache/flink/pull/1566#issuecomment-180152337
  
    Hello @tillrohrmann please review the commit as the work to be done is nearly complete I believe. Please comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2678]DataSet API does not support multi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51453182
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparator.java ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Array;
    +import java.util.Arrays;
    +
    +public class GenericArrayComparator<T> extends TypeComparator<T[]> implements java.io.Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private transient T[] reference;
    +
    +	protected final boolean ascendingComparison;
    +
    +	private final TypeSerializer<T[]> serializer;
    +
    +	// For use by getComparators
    +	@SuppressWarnings("rawtypes")
    +	private final TypeComparator[] comparators = new TypeComparator[] {this};
    +
    +	public GenericArrayComparator(boolean ascending, TypeSerializer<T[]> serializer) {
    +		this.ascendingComparison = ascending;
    +		this.serializer = serializer;
    +	}
    +
    +	@Override
    +	public void setReference(T[] reference) {
    +		this.reference = reference;
    +	}
    +
    +	@Override
    +	public boolean equalToReference(T[] candidate) {
    +		return compare(this.reference, candidate) == 0;
    +	}
    +
    +	@Override
    +	public int compareToReference(TypeComparator<T[]> referencedComparator) {
    +		int comp = compare(((GenericArrayComparator<T>) referencedComparator).reference, reference);
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	@Override
    +	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
    +		T[] firstArray = serializer.deserialize(firstSource);
    +		T[] secondArray = serializer.deserialize(secondSource);
    +
    +		int comp = compare(firstArray, secondArray);
    +		return ascendingComparison ? comp : -comp;
    +	}
    +
    +	@Override
    +	public int extractKeys(Object record, Object[] target, int index) {
    +		target[index] = record;
    +		return 1;
    +	}
    +
    +	@Override
    +	public TypeComparator[] getFlatComparators() {
    +		return comparators;
    +	}
    +
    +	@Override
    +	public boolean supportsNormalizedKey() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean supportsSerializationWithKeyNormalization() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getNormalizeKeyLen() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void putNormalizedKey(T[] record, MemorySegment target, int offset, int numBytes) {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public void writeWithKeyNormalization(T[] record, DataOutputView target) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public T[] readWithKeyDenormalization(T[] reuse, DataInputView source) throws IOException {
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	@Override
    +	public boolean invertNormalizedKey() {
    +		return !ascendingComparison;
    +	}
    +
    +	@Override
    +	public int hash(T[] record) {
    +		return Arrays.hashCode(record);
    +	}
    +
    +	private int compareValues(Object first, Object second) {
    --- End diff --
    
    This only works for primitive types. What about complex types?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---