You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/18 17:27:34 UTC

incubator-flink git commit: [FLINK-1336] [core] Fix bug in StringValue binary copy method

Repository: incubator-flink
Updated Branches:
  refs/heads/master 94cfe1487 -> 6e9b2848d


[FLINK-1336] [core] Fix bug in StringValue binary copy method


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6e9b2848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6e9b2848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6e9b2848

Branch: refs/heads/master
Commit: 6e9b2848d5fabace5c6ef491c87c562eed9b5f43
Parents: 94cfe14
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Dec 18 15:28:54 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 18 15:28:54 2014 +0100

----------------------------------------------------------------------
 .../OutputViewDataOutputStreamWrapper.java      |  14 +-
 .../org/apache/flink/types/StringValue.java     |  11 +-
 .../base/StringValueSerializerTest.java         |   3 +-
 .../types/StringValueSerializationTest.java     | 189 +++++++++
 .../sort/MassiveStringValueSortingITCase.java   | 392 +++++++++++++++++++
 5 files changed, 600 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6e9b2848/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
index aa27315..ffe36c0 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
@@ -18,16 +18,28 @@
 
 package org.apache.flink.core.memory;
 
+import java.io.Closeable;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 
-public class OutputViewDataOutputStreamWrapper implements DataOutputView {
+public class OutputViewDataOutputStreamWrapper implements DataOutputView, Closeable {
+	
 	private final DataOutputStream out;
 
 	public OutputViewDataOutputStreamWrapper(DataOutputStream out){
 		this.out = out;
 	}
+	
+	
+	public void flush() throws IOException {
+		out.flush();
+	}
+	
+	@Override
+	public void close() throws IOException {
+		out.close();
+	}
 
 	@Override
 	public void skipBytesToWrite(int numBytes) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6e9b2848/flink-core/src/main/java/org/apache/flink/types/StringValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
index 10c6b4c..d3317e9 100644
--- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.types;
 
 import java.io.DataInput;
@@ -37,7 +36,6 @@ import org.apache.flink.core.memory.MemorySegment;
  * helps to increase the performance, as string objects are rather heavy-weight objects and incur a lot of garbage
  * collection overhead, if created and destroyed in masses.
  * 
- * 
  * @see org.apache.flink.types.Key
  * @see org.apache.flink.types.NormalizableKey
  * @see java.lang.String
@@ -699,16 +697,15 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 				target.writeByte(curr);
 			}
 			len |= curr << shift;
+			target.writeByte(curr);
 		}
 
 		for (int i = 0; i < len; i++) {
 			int c = in.readUnsignedByte();
 			target.writeByte(c);
-			if (c >= HIGH_BIT) {
-				int curr;
-				while ((curr = in.readUnsignedByte()) >= HIGH_BIT) {
-					target.writeByte(curr);
-				}
+			while (c >= HIGH_BIT) {
+				c = in.readUnsignedByte();
+				target.writeByte(c);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6e9b2848/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
index c20cd92..a2433bd 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
@@ -50,6 +50,7 @@ public class StringValueSerializerTest extends SerializerTestBase<StringValue> {
 				new StringValue("bcd"),
 				new StringValue("jbmbmner8 jhk hj \n \t üäßß@µ"),
 				new StringValue(""),
-				new StringValue("non-empty")};
+				new StringValue("non-empty"),
+				new StringValue("http://some-uri.com/that/is/a/common/prefix/to/all(((cmNH`0R)H<tnLa:/;Q,igWY2EdwW^W7T3H6NMRoqR[O2TqQ@SbGKc(:0XOXq-5]ndm-R8?=,o?AW+9Pi_v4eON=Mpje7N4n*-nhFWKn>Sn0cGMlnDquY@-F:QY@-UZ.-//*OL*8\\SIpiZa)tefalZ99-P_-WFIaKPeGbkQ^iRgd,YYkn7:jBAW::PqAYtgl73dTaJ2CIT:11HJ70<ATOXZ]c6b_7EgQU,@uq+SMa=7Z]kg/OZ>TGduw>D7Lu[nEj_l=Ucwo5BQtBESh/4V>N9nj/pDLw[NM)a=ac6R-(FM2U+dwROMUH;);Y=")};
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6e9b2848/flink-core/src/test/java/org/apache/flink/types/StringValueSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/StringValueSerializationTest.java b/flink-core/src/test/java/org/apache/flink/types/StringValueSerializationTest.java
new file mode 100644
index 0000000..f64ced5
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/StringValueSerializationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.StringUtils;
+import org.junit.Test;
+
+/**
+ * Test for the serialization of StringValue.
+ */
+public class StringValueSerializationTest {
+
+	private final Random rnd = new Random(2093486528937460234L);
+	
+	
+	@Test
+	public void testNonNullValues() {
+		try {
+			String[] testStrings = new String[] {
+				"a", "", "bcd", "jbmbmner8 jhk hj \n \t üäßß@µ", "", "non-empty"
+			};
+			
+			testSerialization(testStrings);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testLongValues() {
+		try {
+			String[] testStrings = new String[] {
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2)
+			};
+			
+			testSerialization(testStrings);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testMixedValues() {
+		try {
+			String[] testStrings = new String[] {
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				"",
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				"",
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				""
+			};
+			
+			testSerialization(testStrings);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBinaryCopyOfLongStrings() {
+		try {
+			String[] testStrings = new String[] {
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				"",
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				"",
+				StringUtils.getRandomString(rnd, 10000, 1024 * 1024 * 2),
+				""
+			};
+			
+			testCopy(testStrings);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+	
+	public static final void testSerialization(String[] values) throws IOException {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+		OutputViewDataOutputStreamWrapper serializer = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
+		
+		for (String value : values) {
+			StringValue sv = new StringValue(value);
+			sv.write(serializer);
+		}
+		
+		serializer.close();
+		baos.close();
+		
+		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+		InputViewDataInputStreamWrapper deserializer = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
+		
+		int num = 0;
+		while (bais.available() > 0) {
+			StringValue deser = new StringValue();
+			deser.read(deserializer);
+			
+			assertEquals("DeserializedString differs from original string.", values[num], deser.getValue());
+			num++;
+		}
+		
+		assertEquals("Wrong number of deserialized values", values.length, num);
+	}
+
+	public static final void testCopy(String[] values) throws IOException {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+		OutputViewDataOutputStreamWrapper serializer = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
+		
+		StringValue sValue = new StringValue();
+		
+		for (String value : values) {
+			sValue.setValue(value);
+			sValue.write(serializer);
+		}
+		
+		serializer.close();
+		baos.close();
+		
+		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+		InputViewDataInputStreamWrapper source = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
+		
+		ByteArrayOutputStream targetOutput = new ByteArrayOutputStream(4096);
+		OutputViewDataOutputStreamWrapper target = new OutputViewDataOutputStreamWrapper(new DataOutputStream(targetOutput));
+		
+		for (int i = 0; i < values.length; i++) {
+			sValue.copy(source, target);
+		}
+		
+		ByteArrayInputStream validateInput = new ByteArrayInputStream(targetOutput.toByteArray());
+		InputViewDataInputStreamWrapper validate = new InputViewDataInputStreamWrapper(new DataInputStream(validateInput));
+		
+		int num = 0;
+		while (validateInput.available() > 0) {
+			sValue.read(validate);
+			
+			assertEquals("DeserializedString differs from original string.", values[num], sValue.getValue());
+			num++;
+		}
+		
+		assertEquals("Wrong number of deserialized values", values.length, num);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6e9b2848/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
new file mode 100644
index 0000000..f2a3fc7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
+import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Assert;
+
+public class MassiveStringValueSortingITCase {
+
+	private static final long SEED = 347569784659278346L;
+	
+	public void testStringValueSorting() {
+		File input = null;
+		File sorted = null;
+
+		try {
+			// the source file
+			input = generateFileWithStrings(300000, "http://some-uri.com/that/is/a/common/prefix/to/all");
+			
+			// the sorted file
+			sorted = File.createTempFile("sorted_strings", "txt");
+			
+			String[] command = {"/bin/bash","-c","export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""};
+			
+			Process p = null;
+			try {
+				p = Runtime.getRuntime().exec(command);
+				int retCode = p.waitFor();
+				if (retCode != 0) {
+					throw new Exception("Command failed with return code " + retCode);
+				}
+				p = null;
+			} finally {
+				if (p != null) {
+					p.destroy();
+				}
+			}
+			
+			// sort the data
+			UnilateralSortMerger<StringValue> sorter = null;
+			BufferedReader reader = null;
+			BufferedReader verifyReader = null;
+			
+			try {
+				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
+				IOManager ioMan = new IOManagerAsync();
+					
+				TypeSerializer<StringValue> serializer = new CopyableValueSerializer<StringValue>(StringValue.class);
+				TypeComparator<StringValue> comparator = new CopyableValueComparator<StringValue>(true, StringValue.class);
+				
+				reader = new BufferedReader(new FileReader(input));
+				MutableObjectIterator<StringValue> inputIterator = new StringValueReaderMutableObjectIterator(reader);
+				
+				sorter = new UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new DummyInvokable(),
+						new RuntimeStatelessSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f);
+
+				MutableObjectIterator<StringValue> sortedData = sorter.getIterator();
+				
+				reader.close();
+				
+				// verify
+				verifyReader = new BufferedReader(new FileReader(sorted));
+				String nextVerify;
+				StringValue nextFromFlinkSort = new StringValue();
+				
+				while ((nextVerify = verifyReader.readLine()) != null) {
+					nextFromFlinkSort = sortedData.next(nextFromFlinkSort);
+					
+					Assert.assertNotNull(nextFromFlinkSort);
+					Assert.assertEquals(nextVerify, nextFromFlinkSort.getValue());
+				}
+			}
+			finally {
+				if (reader != null) {
+					reader.close();
+				}
+				if (verifyReader != null) {
+					verifyReader.close();
+				}
+				if (sorter != null) {
+					sorter.close();
+				}
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+		finally {
+			if (input != null) {
+				input.delete();
+			}
+			if (sorted != null) {
+				sorted.delete();
+			}
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	public void testStringValueTuplesSorting() {
+		final int NUM_STRINGS = 300000;
+		
+		File input = null;
+		File sorted = null;
+
+		try {
+			// the source file
+			input = generateFileWithStringTuples(NUM_STRINGS, "http://some-uri.com/that/is/a/common/prefix/to/all");
+			
+			// the sorted file
+			sorted = File.createTempFile("sorted_strings", "txt");
+			
+			String[] command = {"/bin/bash","-c","export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""};
+			
+			Process p = null;
+			try {
+				p = Runtime.getRuntime().exec(command);
+				int retCode = p.waitFor();
+				if (retCode != 0) {
+					throw new Exception("Command failed with return code " + retCode);
+				}
+				p = null;
+			} finally {
+				if (p != null) {
+					p.destroy();
+				}
+			}
+			
+			// sort the data
+			UnilateralSortMerger<Tuple2<StringValue, StringValue[]>> sorter = null;
+			BufferedReader reader = null;
+			BufferedReader verifyReader = null;
+			
+			try {
+				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
+				IOManager ioMan = new IOManagerAsync();
+					
+				TupleTypeInfo<Tuple2<StringValue, StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>) (TupleTypeInfo<?>)
+						TypeInfoParser.parse("Tuple2<org.apache.flink.types.StringValue, org.apache.flink.types.StringValue[]>");
+
+				TypeSerializer<Tuple2<StringValue, StringValue[]>> serializer = typeInfo.createSerializer();
+				TypeComparator<Tuple2<StringValue, StringValue[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 0);
+				
+				reader = new BufferedReader(new FileReader(input));
+				MutableObjectIterator<Tuple2<StringValue, StringValue[]>> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader);
+				
+				sorter = new UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
+						new RuntimeStatelessSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
+
+				
+				
+				// use this part to verify that all if good when sorting in memory
+				
+//				List<MemorySegment> memory = mm.allocatePages(new DummyInvokable(), mm.computeNumberOfPages(1024*1024*1024));
+//				NormalizedKeySorter<Tuple2<String, String[]>> nks = new NormalizedKeySorter<Tuple2<String,String[]>>(serializer, comparator, memory);
+//
+//				{
+//					Tuple2<String, String[]> wi = new Tuple2<String, String[]>("", new String[0]);
+//					while ((wi = inputIterator.next(wi)) != null) {
+//						Assert.assertTrue(nks.write(wi));
+//					}
+//					
+//					new QuickSort().sort(nks);
+//				}
+//				
+//				MutableObjectIterator<Tuple2<String, String[]>> sortedData = nks.getIterator();
+				
+				MutableObjectIterator<Tuple2<StringValue, StringValue[]>> sortedData = sorter.getIterator();
+				reader.close();
+				
+				// verify
+				verifyReader = new BufferedReader(new FileReader(sorted));
+				MutableObjectIterator<Tuple2<StringValue, StringValue[]>> verifyIterator = new StringValueTupleReaderMutableObjectIterator(verifyReader);
+				
+				Tuple2<StringValue, StringValue[]> nextVerify = new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0]);
+				Tuple2<StringValue, StringValue[]> nextFromFlinkSort = new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0]);
+				
+				int num = 0;
+				
+				while ((nextVerify = verifyIterator.next(nextVerify)) != null) {
+					num++;
+					
+					nextFromFlinkSort = sortedData.next(nextFromFlinkSort);
+					Assert.assertNotNull(nextFromFlinkSort);
+					
+					Assert.assertEquals(nextVerify.f0, nextFromFlinkSort.f0);
+					Assert.assertArrayEquals(nextVerify.f1, nextFromFlinkSort.f1);
+				}
+				
+				Assert.assertNull(sortedData.next(nextFromFlinkSort));
+				Assert.assertEquals(NUM_STRINGS, num);
+
+			}
+			finally {
+				if (reader != null) {
+					reader.close();
+				}
+				if (verifyReader != null) {
+					verifyReader.close();
+				}
+				if (sorter != null) {
+					sorter.close();
+				}
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+		finally {
+			if (input != null) {
+				input.delete();
+			}
+			if (sorted != null) {
+				sorted.delete();
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class StringValueReaderMutableObjectIterator implements MutableObjectIterator<StringValue> {
+		
+		private final BufferedReader reader;
+
+		public StringValueReaderMutableObjectIterator(BufferedReader reader) {
+			this.reader = reader;
+		}
+		
+		@Override
+		public StringValue next(StringValue reuse) throws IOException {
+			String line = reader.readLine();
+			
+			if (line == null) {
+				return null;
+			}
+			
+			reuse.setValue(line);
+			return reuse;
+		}
+	}
+	
+	private static final class StringValueTupleReaderMutableObjectIterator implements MutableObjectIterator<Tuple2<StringValue, StringValue[]>> {
+		
+		private final BufferedReader reader;
+
+		public StringValueTupleReaderMutableObjectIterator(BufferedReader reader) {
+			this.reader = reader;
+		}
+		
+		@Override
+		public Tuple2<StringValue, StringValue[]> next(Tuple2<StringValue, StringValue[]> reuse) throws IOException {
+			String line = reader.readLine();
+			if (line == null) {
+				return null;
+			}
+			
+			String[] parts = line.split(" ");
+			reuse.f0.setValue(parts[0]);
+			reuse.f1 = new StringValue[parts.length];
+			
+			for (int i = 0; i < parts.length; i++) {
+				reuse.f1[i] = new StringValue(parts[i]);
+			}
+			
+			return reuse;
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private File generateFileWithStrings(int numStrings, String prefix) throws IOException {
+		final Random rnd = new Random(SEED);
+		
+		final StringBuilder bld = new StringBuilder();
+		final int resetValue = prefix.length();
+		
+		bld.append(prefix);
+		
+		File f = File.createTempFile("strings", "txt");
+		BufferedWriter wrt = null;
+		try {
+			wrt = new BufferedWriter(new FileWriter(f));
+		
+			for (int i = 0 ; i < numStrings; i++) {
+				bld.setLength(resetValue);
+				
+				int len = rnd.nextInt(20) + 300;
+				for (int k = 0; k < len; k++) {
+					char c = (char) (rnd.nextInt(80) + 40);
+					bld.append(c);
+				}
+				
+				String str = bld.toString();
+				wrt.write(str);
+				wrt.newLine();
+			}
+		} finally {
+			wrt.close();
+		}
+		
+		return f;
+	}
+	
+	private File generateFileWithStringTuples(int numStrings, String prefix) throws IOException {
+		final Random rnd = new Random(SEED);
+		
+		final StringBuilder bld = new StringBuilder();
+
+		File f = File.createTempFile("strings", "txt");
+		BufferedWriter wrt = null;
+		try {
+			wrt = new BufferedWriter(new FileWriter(f));
+		
+			for (int i = 0 ; i < numStrings; i++) {
+				bld.setLength(0);
+				
+				int numComps = rnd.nextInt(5) + 1;
+				
+				for (int z = 0; z < numComps; z++) {
+					if (z > 0) {
+						bld.append(' ');
+					}
+					bld.append(prefix);
+				
+					int len = rnd.nextInt(20) + 10;
+					for (int k = 0; k < len; k++) {
+						char c = (char) (rnd.nextInt(80) + 40);
+						bld.append(c);
+					}
+				}
+				
+				String str = bld.toString();
+				
+				wrt.write(str);
+				wrt.newLine();
+			}
+		} finally {
+			wrt.close();
+		}
+		
+		return f;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static void main(String[] args) {
+		new MassiveStringValueSortingITCase().testStringValueSorting();
+		new MassiveStringValueSortingITCase().testStringValueTuplesSorting();
+	}
+}