You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 19:42:49 UTC
[1/2] flink git commit: [tests] Move manual test
'RecordsAndWidthsCombinationCheck' to proper tests scope
Repository: flink
Updated Branches:
refs/heads/master 824785e26 -> f5016439b
[tests] Move manual test 'RecordsAndWidthsCombinationCheck' to proper tests scope
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38098cf2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38098cf2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38098cf2
Branch: refs/heads/master
Commit: 38098cf2223f1779df06b1b7b79ab5261ba33d8b
Parents: 824785e
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 27 16:27:28 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 16:27:28 2015 +0200
----------------------------------------------------------------------
.../hash/RecordsAndWidthsCombinationCheck.java | 199 ------------------
.../HashTableRecordWidthCombinations.java | 200 +++++++++++++++++++
2 files changed, 200 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/38098cf2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java
deleted file mode 100644
index 56ee1da..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.fail;
-
-public class RecordsAndWidthsCombinationCheck {
-
- public static void main(String[] args) throws Exception {
-
- @SuppressWarnings("unchecked")
- final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer =
- new TupleSerializer<Tuple2<Long, byte[]>>(
- (Class<Tuple2<Long, byte[]>>) (Class<?>) Tuple2.class,
- new TypeSerializer<?>[] { LongSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE });
-
- final TypeSerializer<Long> probeSerializer = LongSerializer.INSTANCE;
-
- final TypeComparator<Tuple2<Long, byte[]>> buildComparator = new TupleComparator<Tuple2<Long, byte[]>>(
- new int[] {0},
- new TypeComparator<?>[] { new LongComparator(true) },
- new TypeSerializer<?>[] { LongSerializer.INSTANCE });
-
- final TypeComparator<Long> probeComparator = new LongComparator(true);
-
- final TypePairComparator<Long, Tuple2<Long, byte[]>> pairComparator = new TypePairComparator<Long, Tuple2<Long, byte[]>>() {
-
- private long ref;
-
- @Override
- public void setReference(Long reference) {
- ref = reference;
- }
-
- @Override
- public boolean equalToReference(Tuple2<Long, byte[]> candidate) {
- //noinspection UnnecessaryUnboxing
- return candidate.f0.longValue() == ref;
- }
-
- @Override
- public int compareToReference(Tuple2<Long, byte[]> candidate) {
- long x = ref;
- long y = candidate.f0;
- return (x < y) ? -1 : ((x == y) ? 0 : 1);
- }
- };
-
- final IOManager ioMan = new IOManagerAsync();
-
- try {
- final int pageSize = 32*1024;
- final int numSegments = 34;
-
- for (int num = 3400; num < 3550; num++) {
- final int numRecords = num;
-
- for (int recordLen = 270; recordLen < 320; recordLen++) {
-
- final byte[] payload = new byte[recordLen - 8 - 4];
-
- System.out.println("testing " + numRecords + " / " + recordLen);
-
- List<MemorySegment> memory = getMemory(numSegments, pageSize);
-
- // we create a hash table that thinks the records are super large. that makes it choose initially
- // a lot of memory for the partition buffers, and start with a smaller hash table. that way
- // we trigger a hash table growth early.
- MutableHashTable<Tuple2<Long, byte[]>, Long> table = new MutableHashTable<>(
- buildSerializer, probeSerializer, buildComparator, probeComparator,
- pairComparator, memory, ioMan, 16, false);
-
- final MutableObjectIterator<Tuple2<Long, byte[]>> buildInput = new MutableObjectIterator<Tuple2<Long, byte[]>>() {
-
- private int count = 0;
-
- @Override
- public Tuple2<Long, byte[]> next(Tuple2<Long, byte[]> reuse) {
- return next();
- }
-
- @Override
- public Tuple2<Long, byte[]> next() {
- if (count++ < numRecords) {
- return new Tuple2<>(42L, payload);
- } else {
- return null;
- }
- }
- };
-
- // probe side
- final MutableObjectIterator<Long> probeInput = new MutableObjectIterator<Long>() {
-
- private final long numRecords = 10000;
- private long value = 0;
-
- @Override
- public Long next(Long aLong) {
- return next();
- }
-
- @Override
- public Long next() {
- if (value < numRecords) {
- return value++;
- } else {
- return null;
- }
- }
- };
-
- table.open(buildInput, probeInput);
-
- try {
- while (table.nextRecord()) {
- MutableHashTable.HashBucketIterator<Tuple2<Long, byte[]>, Long> matches = table.getBuildSideIterator();
- while (matches.next() != null);
- }
- }
- catch (RuntimeException e) {
- if (!e.getMessage().contains("exceeded maximum number of recursions")) {
- throw e;
- }
- }
- finally {
- table.close();
- }
-
- // make sure no temp files are left
- checkNoTempFilesRemain(ioMan);
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- ioMan.shutdown();
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
- ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
- for (int i = 0; i < numSegments; i++) {
- list.add(new MemorySegment(new byte[segmentSize]));
- }
- return list;
- }
-
- private static void checkNoTempFilesRemain(IOManager ioManager) {
- for (File dir : ioManager.getSpillingDirectories()) {
- for (String file : dir.list()) {
- if (file != null && !(file.equals(".") || file.equals(".."))) {
- fail("hash table did not clean up temp files. remaining file: " + file);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/38098cf2/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
new file mode 100644
index 0000000..493de07
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.operators.hash.MutableHashTable;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+public class HashTableRecordWidthCombinations {
+
+ public static void main(String[] args) throws Exception {
+
+ @SuppressWarnings("unchecked")
+ final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer =
+ new TupleSerializer<Tuple2<Long, byte[]>>(
+ (Class<Tuple2<Long, byte[]>>) (Class<?>) Tuple2.class,
+ new TypeSerializer<?>[] { LongSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE });
+
+ final TypeSerializer<Long> probeSerializer = LongSerializer.INSTANCE;
+
+ final TypeComparator<Tuple2<Long, byte[]>> buildComparator = new TupleComparator<Tuple2<Long, byte[]>>(
+ new int[] {0},
+ new TypeComparator<?>[] { new LongComparator(true) },
+ new TypeSerializer<?>[] { LongSerializer.INSTANCE });
+
+ final TypeComparator<Long> probeComparator = new LongComparator(true);
+
+ final TypePairComparator<Long, Tuple2<Long, byte[]>> pairComparator = new TypePairComparator<Long, Tuple2<Long, byte[]>>() {
+
+ private long ref;
+
+ @Override
+ public void setReference(Long reference) {
+ ref = reference;
+ }
+
+ @Override
+ public boolean equalToReference(Tuple2<Long, byte[]> candidate) {
+ //noinspection UnnecessaryUnboxing
+ return candidate.f0.longValue() == ref;
+ }
+
+ @Override
+ public int compareToReference(Tuple2<Long, byte[]> candidate) {
+ long x = ref;
+ long y = candidate.f0;
+ return (x < y) ? -1 : ((x == y) ? 0 : 1);
+ }
+ };
+
+ final IOManager ioMan = new IOManagerAsync();
+
+ try {
+ final int pageSize = 32*1024;
+ final int numSegments = 34;
+
+ for (int num = 3400; num < 3550; num++) {
+ final int numRecords = num;
+
+ for (int recordLen = 270; recordLen < 320; recordLen++) {
+
+ final byte[] payload = new byte[recordLen - 8 - 4];
+
+ System.out.println("testing " + numRecords + " / " + recordLen);
+
+ List<MemorySegment> memory = getMemory(numSegments, pageSize);
+
+ // we create a hash table that thinks the records are super large. that makes it choose initially
+ // a lot of memory for the partition buffers, and start with a smaller hash table. that way
+ // we trigger a hash table growth early.
+ MutableHashTable<Tuple2<Long, byte[]>, Long> table = new MutableHashTable<>(
+ buildSerializer, probeSerializer, buildComparator, probeComparator,
+ pairComparator, memory, ioMan, 16, false);
+
+ final MutableObjectIterator<Tuple2<Long, byte[]>> buildInput = new MutableObjectIterator<Tuple2<Long, byte[]>>() {
+
+ private int count = 0;
+
+ @Override
+ public Tuple2<Long, byte[]> next(Tuple2<Long, byte[]> reuse) {
+ return next();
+ }
+
+ @Override
+ public Tuple2<Long, byte[]> next() {
+ if (count++ < numRecords) {
+ return new Tuple2<>(42L, payload);
+ } else {
+ return null;
+ }
+ }
+ };
+
+ // probe side
+ final MutableObjectIterator<Long> probeInput = new MutableObjectIterator<Long>() {
+
+ private final long numRecords = 10000;
+ private long value = 0;
+
+ @Override
+ public Long next(Long aLong) {
+ return next();
+ }
+
+ @Override
+ public Long next() {
+ if (value < numRecords) {
+ return value++;
+ } else {
+ return null;
+ }
+ }
+ };
+
+ table.open(buildInput, probeInput);
+
+ try {
+ while (table.nextRecord()) {
+ MutableHashTable.HashBucketIterator<Tuple2<Long, byte[]>, Long> matches = table.getBuildSideIterator();
+ while (matches.next() != null);
+ }
+ }
+ catch (RuntimeException e) {
+ if (!e.getMessage().contains("exceeded maximum number of recursions")) {
+ throw e;
+ }
+ }
+ finally {
+ table.close();
+ }
+
+ // make sure no temp files are left
+ checkNoTempFilesRemain(ioMan);
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ ioMan.shutdown();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
+ ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
+ for (int i = 0; i < numSegments; i++) {
+ list.add(new MemorySegment(new byte[segmentSize]));
+ }
+ return list;
+ }
+
+ private static void checkNoTempFilesRemain(IOManager ioManager) {
+ for (File dir : ioManager.getSpillingDirectories()) {
+ for (String file : dir.list()) {
+ if (file != null && !(file.equals(".") || file.equals(".."))) {
+ fail("hash table did not clean up temp files. remaining file: " + file);
+ }
+ }
+ }
+ }
+}
[2/2] flink git commit: [FLINK-2584] [java api] Downgrade version of
javakaffee kryo serializers, for compatibility with kryo 2.4
Posted by se...@apache.org.
[FLINK-2584] [java api] Downgrade version of javakaffee kryo serializers, for compatibility with kryo 2.4
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5016439
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5016439
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5016439
Branch: refs/heads/master
Commit: f5016439ba405eb3fb3098e66273a5162c7af58d
Parents: 38098cf
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 27 18:31:30 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 18:31:30 2015 +0200
----------------------------------------------------------------------
flink-java/pom.xml | 2 +-
pom.xml | 1 -
2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f5016439/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 6b818e0..49e6099 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -78,7 +78,7 @@ under the License.
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
- <version>0.36</version>
+ <version>0.27</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/f5016439/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b082162..dbba9fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,6 @@ under the License.
<guava.version>18.0</guava.version>
<akka.version>2.3.7</akka.version>
<scala.macros.version>2.0.1</scala.macros.version>
- <kryoserialization.version>0.3.2</kryoserialization.version>
<chill.version>0.5.2</chill.version>
<asm.version>5.0.4</asm.version>
<tez.version>0.6.1</tez.version>