You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by gl...@apache.org on 2019/06/13 09:10:44 UTC
[beam] branch master updated: Merge pull request #8552: [BEAM-7268]
make sorter extension Hadoop-free
This is an automated email from the ASF dual-hosted git repository.
gleb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 999bc5a Merge pull request #8552: [BEAM-7268] make sorter extension Hadoop-free
999bc5a is described below
commit 999bc5af4ed67fc68e50ec17446cc6e7c0b0cf7d
Author: Neville Li <ne...@spotify.com>
AuthorDate: Thu Jun 13 05:10:16 2019 -0400
Merge pull request #8552: [BEAM-7268] make sorter extension Hadoop-free
---
.../extensions/sorter/BufferedExternalSorter.java | 22 +-
.../beam/sdk/extensions/sorter/ExternalSorter.java | 196 ++------------
...ternalSorter.java => HadoopExternalSorter.java} | 52 +---
.../extensions/sorter/NativeExternalSorter.java | 77 ++++++
.../sdk/extensions/sorter/NativeFileSorter.java | 283 +++++++++++++++++++++
.../extensions/sorter/ExternalSorterBenchmark.java | 62 +++++
.../sdk/extensions/sorter/ExternalSorterTest.java | 56 +++-
7 files changed, 514 insertions(+), 234 deletions(-)
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
index 6032b00..4a60a99 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
@@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
import java.io.IOException;
import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sorter.ExternalSorter.Options.SorterType;
import org.apache.beam.sdk.values.KV;
/**
@@ -29,17 +30,19 @@ import org.apache.beam.sdk.values.KV;
*/
public class BufferedExternalSorter implements Sorter {
public static Options options() {
- return new Options("/tmp", 100);
+ return new Options("/tmp", 100, SorterType.HADOOP);
}
/** Contains configuration for the sorter. */
public static class Options implements Serializable {
private final String tempLocation;
private final int memoryMB;
+ private final SorterType sorterType;
- private Options(String tempLocation, int memoryMB) {
+ private Options(String tempLocation, int memoryMB, SorterType sorterType) {
this.tempLocation = tempLocation;
this.memoryMB = memoryMB;
+ this.sorterType = sorterType;
}
/** Sets the path to a temporary location where the sorter writes intermediate files. */
@@ -48,7 +51,7 @@ public class BufferedExternalSorter implements Sorter {
!tempLocation.startsWith("gs://"),
"BufferedExternalSorter does not support GCS temporary location");
- return new Options(tempLocation, memoryMB);
+ return new Options(tempLocation, memoryMB, sorterType);
}
/** Returns the configured temporary location. */
@@ -66,13 +69,23 @@ public class BufferedExternalSorter implements Sorter {
// Hadoop's external sort stores the number of available memory bytes in an int, this prevents
// overflow
checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
- return new Options(tempLocation, memoryMB);
+ return new Options(tempLocation, memoryMB, sorterType);
}
/** Returns the configured size of the memory buffer. */
public int getMemoryMB() {
return memoryMB;
}
+
+ /** Sets the external sorter type. */
+ public Options withExternalSorterType(SorterType sorterType) {
+ return new Options(tempLocation, memoryMB, sorterType);
+ }
+
+ /** Returns the external sorter type. */
+ public SorterType getExternalSorterType() {
+ return sorterType;
+ }
}
private final ExternalSorter externalSorter;
@@ -89,6 +102,7 @@ public class BufferedExternalSorter implements Sorter {
ExternalSorter.Options externalSorterOptions = new ExternalSorter.Options();
externalSorterOptions.setMemoryMB(options.getMemoryMB());
externalSorterOptions.setTempLocation(options.getTempLocation());
+ externalSorterOptions.setSorterType(options.getExternalSorterType());
InMemorySorter.Options inMemorySorterOptions = new InMemorySorter.Options();
inMemorySorterOptions.setMemoryMB(options.getMemoryMB());
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
index 192b8ca..576d1fa 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
@@ -18,53 +18,24 @@
package org.apache.beam.sdk.extensions.sorter;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.io.Serializable;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-import javax.annotation.Nonnull;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.mapred.JobConf;
-/** Does an external sort of the provided values using Hadoop's {@link SequenceFile}. */
-class ExternalSorter implements Sorter {
- private final Options options;
-
- /** Whether {@link #sort()} was already called. */
- private boolean sortCalled = false;
-
- /** SequenceFile Writer for writing all input data to a file. */
- private Writer writer;
-
- /** Sorter used to sort the input file. */
- private SequenceFile.Sorter sorter;
-
- /** Temporary directory for input and intermediate files. */
- private Path tempDir;
-
- /** The list of input files to be sorted. */
- private Path[] paths;
-
- private boolean initialized = false;
+/** Does an external sort of the provided values. */
+public abstract class ExternalSorter implements Sorter {
+ protected final Options options;
/** {@link Options} contains configuration of the sorter. */
public static class Options implements Serializable {
private String tempLocation = "/tmp";
private int memoryMB = 100;
+ private SorterType sorterType = SorterType.HADOOP;
+
+ /** Sorter type. */
+ public enum SorterType {
+ HADOOP,
+ NATIVE
+ }
/** Sets the path to a temporary location where the sorter writes intermediate files. */
public Options setTempLocation(String tempLocation) {
@@ -98,144 +69,27 @@ class ExternalSorter implements Sorter {
public int getMemoryMB() {
return memoryMB;
}
- }
-
- /** Returns a {@link Sorter} configured with the given {@link Options}. */
- public static ExternalSorter create(Options options) {
- return new ExternalSorter(options);
- }
-
- @Override
- public void add(KV<byte[], byte[]> record) throws IOException {
- checkState(!sortCalled, "Records can only be added before sort()");
-
- initHadoopSorter();
-
- BytesWritable key = new BytesWritable(record.getKey());
- BytesWritable value = new BytesWritable(record.getValue());
-
- writer.append(key, value);
- }
-
- @Override
- public Iterable<KV<byte[], byte[]>> sort() throws IOException {
- checkState(!sortCalled, "sort() can only be called once.");
- sortCalled = true;
-
- initHadoopSorter();
-
- writer.close();
-
- return new SortedRecordsIterable();
- }
-
- private ExternalSorter(Options options) {
- this.options = options;
- }
-
- /**
- * Initializes the hadoop sorter. Does some local file system setup, and is somewhat expensive
- * (~20 ms on local machine). Only executed when necessary.
- */
- private void initHadoopSorter() throws IOException {
- if (!initialized) {
- tempDir = new Path(options.getTempLocation(), "tmp" + UUID.randomUUID().toString());
- paths = new Path[] {new Path(tempDir, "test.seq")};
- JobConf conf = new JobConf();
- // Sets directory for intermediate files created during merge of merge sort
- conf.set("io.seqfile.local.dir", tempDir.toUri().getPath());
-
- writer =
- SequenceFile.createWriter(
- conf,
- Writer.valueClass(BytesWritable.class),
- Writer.keyClass(BytesWritable.class),
- Writer.file(paths[0]),
- Writer.compression(CompressionType.NONE));
-
- FileSystem fs = FileSystem.getLocal(conf);
- // Directory has to exist for Hadoop to recognize it as deletable on exit
- fs.mkdirs(tempDir);
- fs.deleteOnExit(tempDir);
-
- sorter =
- new SequenceFile.Sorter(
- fs, new BytesWritable.Comparator(), BytesWritable.class, BytesWritable.class, conf);
- sorter.setMemory(options.getMemoryMB() * 1024 * 1024);
-
- initialized = true;
+ /** Sets the sorter type. */
+ public Options setSorterType(SorterType sorterType) {
+ this.sorterType = sorterType;
+ return this;
}
- }
- /** An {@link Iterable} producing the iterators over sorted data. */
- private class SortedRecordsIterable implements Iterable<KV<byte[], byte[]>> {
- @Nonnull
- @Override
- public Iterator<KV<byte[], byte[]>> iterator() {
- return new SortedRecordsIterator();
+ /** Returns the sorter type. */
+ public SorterType getSorterType() {
+ return sorterType;
}
}
- /** An {@link Iterator} producing the sorted data. */
- private class SortedRecordsIterator implements Iterator<KV<byte[], byte[]>> {
- private RawKeyValueIterator iterator;
-
- /** Next {@link KV} to return from {@link #next()}. */
- private KV<byte[], byte[]> nextKV;
-
- SortedRecordsIterator() {
- try {
- this.iterator = sorter.sortAndIterate(paths, tempDir, false);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- nextKV = KV.of(null, null); // A dummy value that will be overwritten by next().
- next();
- }
-
- @Override
- public boolean hasNext() {
- return nextKV != null;
- }
-
- @Override
- public KV<byte[], byte[]> next() {
- if (nextKV == null) {
- throw new NoSuchElementException();
- }
-
- KV<byte[], byte[]> current = nextKV;
-
- try {
- if (iterator.next()) {
- // Parse key from DataOutputBuffer.
- ByteArrayInputStream keyStream = new ByteArrayInputStream(iterator.getKey().getData());
- BytesWritable key = new BytesWritable();
- key.readFields(new DataInputStream(keyStream));
-
- // Parse value from ValueBytes.
- ByteArrayOutputStream valOutStream = new ByteArrayOutputStream();
- iterator.getValue().writeUncompressedBytes(new DataOutputStream(valOutStream));
- ByteArrayInputStream valInStream = new ByteArrayInputStream(valOutStream.toByteArray());
- BytesWritable value = new BytesWritable();
- value.readFields(new DataInputStream(valInStream));
-
- nextKV = KV.of(key.copyBytes(), value.copyBytes());
- } else {
- nextKV = null;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- return current;
- }
+ /** Returns a {@link Sorter} configured with the given {@link Options}. */
+ public static ExternalSorter create(Options options) {
+ return options.getSorterType() == Options.SorterType.HADOOP
+ ? HadoopExternalSorter.create(options)
+ : NativeExternalSorter.create(options);
+ }
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Iterator does not support remove");
- }
+ ExternalSorter(Options options) {
+ this.options = options;
}
}
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/HadoopExternalSorter.java
similarity index 79%
copy from sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
copy to sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/HadoopExternalSorter.java
index 192b8ca..c692bf4 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/HadoopExternalSorter.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sorter;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.io.ByteArrayInputStream;
@@ -25,7 +24,6 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.Serializable;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.UUID;
@@ -41,8 +39,7 @@ import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.JobConf;
/** Does an external sort of the provided values using Hadoop's {@link SequenceFile}. */
-class ExternalSorter implements Sorter {
- private final Options options;
+class HadoopExternalSorter extends ExternalSorter {
/** Whether {@link #sort()} was already called. */
private boolean sortCalled = false;
@@ -61,48 +58,9 @@ class ExternalSorter implements Sorter {
private boolean initialized = false;
- /** {@link Options} contains configuration of the sorter. */
- public static class Options implements Serializable {
- private String tempLocation = "/tmp";
- private int memoryMB = 100;
-
- /** Sets the path to a temporary location where the sorter writes intermediate files. */
- public Options setTempLocation(String tempLocation) {
- if (tempLocation.startsWith("gs://")) {
- throw new IllegalArgumentException("Sorter doesn't support GCS temporary location.");
- }
-
- this.tempLocation = tempLocation;
- return this;
- }
-
- /** Returns the configured temporary location. */
- public String getTempLocation() {
- return tempLocation;
- }
-
- /**
- * Sets the size of the memory buffer in megabytes. Must be greater than zero and less than
- * 2048.
- */
- public Options setMemoryMB(int memoryMB) {
- checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
- // Hadoop's external sort stores the number of available memory bytes in an int, this prevents
- // integer overflow
- checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
- this.memoryMB = memoryMB;
- return this;
- }
-
- /** Returns the configured size of the memory buffer. */
- public int getMemoryMB() {
- return memoryMB;
- }
- }
-
/** Returns a {@link Sorter} configured with the given {@link Options}. */
- public static ExternalSorter create(Options options) {
- return new ExternalSorter(options);
+ public static HadoopExternalSorter create(Options options) {
+ return new HadoopExternalSorter(options);
}
@Override
@@ -129,8 +87,8 @@ class ExternalSorter implements Sorter {
return new SortedRecordsIterable();
}
- private ExternalSorter(Options options) {
- this.options = options;
+ private HadoopExternalSorter(Options options) {
+ super(options);
}
/**
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeExternalSorter.java
new file mode 100644
index 0000000..812a96d
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeExternalSorter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sorter;
+
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import org.apache.beam.sdk.values.KV;
+
+/** Does an external sort of the provided values. */
+class NativeExternalSorter extends ExternalSorter {
+
+ /** Whether {@link #sort()} was already called. */
+ private boolean sortCalled = false;
+
+ /** Sorter used to sort the input. */
+ private NativeFileSorter sorter;
+
+ private boolean initialized = false;
+
+ /** Returns a {@link Sorter} configured with the given {@link Options}. */
+ public static NativeExternalSorter create(Options options) {
+ return new NativeExternalSorter(options);
+ }
+
+ @Override
+ public void add(KV<byte[], byte[]> record) throws IOException {
+ checkState(!sortCalled, "Records can only be added before sort()");
+
+ initSorter();
+
+ sorter.add(record.getKey(), record.getValue());
+ }
+
+ @Override
+ public Iterable<KV<byte[], byte[]>> sort() throws IOException {
+ checkState(!sortCalled, "sort() can only be called once.");
+ sortCalled = true;
+
+ initSorter();
+
+ return sorter.sort();
+ }
+
+ private NativeExternalSorter(Options options) {
+ super(options);
+ }
+
+ /**
+ * Initializes the sorter. Does some local file system setup, and is somewhat expensive (~20 ms on
+ * local machine). Only executed when necessary.
+ */
+ private void initSorter() throws IOException {
+ if (!initialized) {
+ sorter =
+ new NativeFileSorter(
+ Paths.get(options.getTempLocation()), (long) options.getMemoryMB() * 1024 * 1024);
+ initialized = true;
+ }
+ }
+}
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeFileSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeFileSorter.java
new file mode 100644
index 0000000..4d6ff32
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeFileSorter.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sorter;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.UnsignedBytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * External Sorter based on <a
+ * href="https://github.com/lemire/externalsortinginjava">lemire/externalsortinginjava</a>.
+ */
+class NativeFileSorter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NativeFileSorter.class);
+
+ private static final int MAX_TEMP_FILES = 1024;
+ private static final long OBJECT_OVERHEAD = getObjectOverhead();
+
+ private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
+ private static final Comparator<KV<byte[], byte[]>> KV_COMPARATOR =
+ (x, y) -> COMPARATOR.compare(x.getKey(), y.getKey());
+ private static final ByteArrayCoder CODER = ByteArrayCoder.of();
+
+ private final Path tempDir;
+ private final long maxMemory;
+ private final File dataFile;
+ private final OutputStream dataStream;
+
+ private boolean sortCalled = false;
+
+ /** Create a new file sorter. */
+ public NativeFileSorter(Path tempDir, long maxMemory) throws IOException {
+ this.tempDir = tempDir;
+ this.maxMemory = maxMemory;
+
+ this.dataFile = Files.createTempFile(tempDir, "input", "seq").toFile();
+ this.dataStream = new BufferedOutputStream(new FileOutputStream(dataFile));
+ dataFile.deleteOnExit();
+
+ LOG.debug("Created input file {}", dataFile);
+ }
+
+ /**
+ * Adds a given record to the sorter.
+ *
+ * <p>Records can only be added before calling {@link #sort()}.
+ */
+ public void add(byte[] key, byte[] value) throws IOException {
+ Preconditions.checkState(!sortCalled, "Records can only be added before sort()");
+ CODER.encode(key, dataStream);
+ CODER.encode(value, dataStream);
+ }
+
+ /**
+ * Sorts the added elements and returns an {@link Iterable} over the sorted elements.
+ *
+ * <p>Can be called at most once.
+ */
+ public Iterable<KV<byte[], byte[]>> sort() throws IOException {
+ Preconditions.checkState(!sortCalled, "sort() can only be called once.");
+ sortCalled = true;
+
+ dataStream.close();
+
+ return mergeSortedFiles(sortInBatch());
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Loads the file by blocks of records, sorts in memory, and writes the result to temporary files
+ * that have to be merged later.
+ */
+ private List<File> sortInBatch() throws IOException {
+ final long fileSize = Files.size(dataFile.toPath());
+ final long memory = maxMemory > 0 ? maxMemory : estimateAvailableMemory();
+ final long blockSize = estimateBestBlockSize(fileSize, memory); // in bytes
+ LOG.debug(
+ "Sort in batch with fileSize: {}, memory: {}, blockSize: {}", fileSize, memory, blockSize);
+
+ final List<File> files = new ArrayList<>();
+ InputStream inputStream = new BufferedInputStream(new FileInputStream(dataFile));
+ try {
+ final List<KV<byte[], byte[]>> tempList = new ArrayList<>();
+ KV<byte[], byte[]> kv = KV.of(null, null);
+ while (kv != null) {
+ long currentBlockSize = 0;
+ while ((currentBlockSize < blockSize) && (kv = readKeyValue(inputStream)) != null) {
+ // as long as you have enough memory
+ tempList.add(kv);
+ currentBlockSize += estimateSizeOf(kv);
+ }
+ files.add(sortAndSave(tempList));
+ tempList.clear();
+ }
+ } finally {
+ inputStream.close();
+ }
+ return files;
+ }
+
+ /** Sort a list and save it to a temporary file. */
+ private File sortAndSave(List<KV<byte[], byte[]>> tempList) throws IOException {
+ final File tempFile = Files.createTempFile(tempDir, "sort", "seq").toFile();
+ tempFile.deleteOnExit();
+ LOG.debug("Sort and save {}", tempFile);
+
+ tempList.sort(KV_COMPARATOR);
+
+ OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
+ try {
+ for (KV<byte[], byte[]> kv : tempList) {
+ CODER.encode(kv.getKey(), outputStream);
+ CODER.encode(kv.getValue(), outputStream);
+ }
+ } finally {
+ outputStream.close();
+ }
+ return tempFile;
+ }
+
+ /** Merges a list of temporary flat files. */
+ private Iterable<KV<byte[], byte[]>> mergeSortedFiles(List<File> files) {
+ return () -> {
+ final List<Iterator<KV<byte[], byte[]>>> iterators = new ArrayList<>();
+ for (File file : files) {
+ try {
+ iterators.add(iterateFile(file));
+ } catch (FileNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ return Iterators.mergeSorted(iterators, KV_COMPARATOR);
+ };
+ }
+
+ /** Creates an {@link Iterator} over the key-value pairs in a file. */
+ private Iterator<KV<byte[], byte[]>> iterateFile(File file) throws FileNotFoundException {
+ final InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
+ return new Iterator<KV<byte[], byte[]>>() {
+ KV<byte[], byte[]> nextKv = readNext();
+
+ @Override
+ public boolean hasNext() {
+ return nextKv != null;
+ }
+
+ @Override
+ public KV<byte[], byte[]> next() {
+ KV<byte[], byte[]> r = nextKv;
+ nextKv = readNext();
+ return r;
+ }
+
+ private KV<byte[], byte[]> readNext() {
+ try {
+ return readKeyValue(inputStream);
+ } catch (EOFException e) {
+ return null;
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+ }
+
+ /** Reads the next key-value pair from a file. */
+ private KV<byte[], byte[]> readKeyValue(InputStream inputStream) throws IOException {
+ try {
+ final byte[] keyBytes = CODER.decode(inputStream);
+ final byte[] valueBytes = CODER.decode(inputStream);
+ return KV.of(keyBytes, valueBytes);
+ } catch (EOFException e) {
+ return null;
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private int bufferSize(int numFiles) {
+ final long memory = maxMemory > 0 ? maxMemory : estimateAvailableMemory();
+ return (int) (memory / numFiles / 2);
+ }
+
+ /**
+ * This method calls the garbage collector and then returns the free memory. This avoids problems
+ * with applications where the GC hasn't reclaimed memory and reports no available memory.
+ */
+ @SuppressFBWarnings("DM_GC")
+ private static long estimateAvailableMemory() {
+ System.gc();
+ // http://stackoverflow.com/questions/12807797/java-get-available-memory
+ final Runtime r = Runtime.getRuntime();
+ final long allocatedMemory = r.totalMemory() - r.freeMemory();
+ return r.maxMemory() - allocatedMemory;
+ }
+
+ /**
+ * We divide the file into small blocks. If the blocks are too small, we shall create too many
+ * temporary files. If they are too big, we shall be using too much memory.
+ *
+ * @param sizeOfFile how much data (in bytes) can we expect
+ * @param maxMemory Maximum memory to use (in bytes)
+ */
+ private static long estimateBestBlockSize(final long sizeOfFile, final long maxMemory) {
+ // we don't want to open up much more than MAX_TEMP_FILES temporary files, better run out of
+ // memory first.
+ long blockSize = sizeOfFile / MAX_TEMP_FILES + (sizeOfFile % MAX_TEMP_FILES == 0 ? 0 : 1);
+
+ // on the other hand, we don't want to create many temporary files for naught. If blockSize is
+ // smaller than half the free memory, grow it.
+ if (blockSize < maxMemory / 2) {
+ blockSize = maxMemory / 2;
+ }
+ return blockSize;
+ }
+
+ private static long getObjectOverhead() {
+ // By default we assume 64 bit JVM
+ // (defensive approach since we will get larger estimations in case we are not sure)
+ boolean is64BitJvm = true;
+ // check the system property "sun.arch.data.model"
+ // not very safe, as it might not work for all JVM implementations
+ // nevertheless the worst thing that might happen is that the JVM is 32bit
+ // but we assume its 64bit, so we will be counting a few extra bytes per string object
+ // no harm done here since this is just an approximation.
+ String arch = System.getProperty("sun.arch.data.model");
+ if (arch != null && arch.contains("32")) {
+ // If exists and is 32 bit then we assume a 32bit JVM
+ is64BitJvm = false;
+ }
+ // The sizes below are a bit rough as we don't take into account
+ // advanced JVM options such as compressed oops
+ // however if our calculation is not accurate it'll be a bit over
+ // so there is no danger of an out of memory error because of this.
+ long objectHeader = is64BitJvm ? 16 : 8;
+ long arrayHeader = is64BitJvm ? 24 : 12;
+ long objectRef = is64BitJvm ? 8 : 4;
+ return objectHeader + (objectRef + arrayHeader) * 2;
+ }
+
+ private static long estimateSizeOf(KV<byte[], byte[]> kv) {
+ return kv.getKey().length + kv.getValue().length + OBJECT_OVERHEAD;
+ }
+}
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterBenchmark.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterBenchmark.java
new file mode 100644
index 0000000..620711f
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterBenchmark.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sorter;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.UUID;
+import org.apache.beam.sdk.extensions.sorter.ExternalSorter.Options.SorterType;
+import org.apache.beam.sdk.values.KV;
+
+/** {@link ExternalSorter} benchmarks. */
+public class ExternalSorterBenchmark {
+ private static final int N = 1000 * 1000; // 1m * (36 * 2) ~= 72MB per 1 million KVs
+
+ public static void main(String[] args) throws IOException {
+ File tempDirectory = Files.createTempDirectory("sorter").toFile();
+ tempDirectory.deleteOnExit();
+
+ ExternalSorter.Options options =
+ new ExternalSorter.Options().setMemoryMB(32).setTempLocation(tempDirectory.toString());
+
+ options.setSorterType(SorterType.HADOOP);
+ benchmark(ExternalSorter.create(options));
+
+ options.setSorterType(SorterType.NATIVE);
+ benchmark(ExternalSorter.create(options));
+ }
+
+ private static void benchmark(Sorter sorter) throws IOException {
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < N; i++) {
+ sorter.add(
+ KV.of(
+ UUID.randomUUID().toString().getBytes(Charset.defaultCharset()),
+ UUID.randomUUID().toString().getBytes(Charset.defaultCharset())));
+ }
+ int i = 0;
+ for (KV<byte[], byte[]> ignored : sorter.sort()) {
+ i++;
+ }
+ long end = System.currentTimeMillis();
+ System.out.println(
+ String.format("%s: %fs", sorter.getClass().getSimpleName(), (end - start) / 1000.0));
+ }
+}
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
index fb0a7b8..f3b391e 100644
--- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
@@ -25,20 +25,30 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.beam.sdk.extensions.sorter.ExternalSorter.Options.SorterType;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
/** Tests for Sorter. */
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
public class ExternalSorterTest {
@Rule public ExpectedException thrown = ExpectedException.none();
private static Path tmpLocation;
+ public ExternalSorterTest(SorterType sorterType) {
+ this.sorterType = sorterType;
+ }
+
+ private final SorterType sorterType;
+
@BeforeClass
public static void setupTempDir() throws IOException {
tmpLocation = Files.createTempDirectory("tmp");
@@ -64,32 +74,46 @@ public class ExternalSorterTest {
});
}
+ @Parameters
+ public static Collection<SorterType[]> data() {
+ return Arrays.asList(
+ new SorterType[] {SorterType.HADOOP}, new SorterType[] {SorterType.NATIVE});
+ }
+
@Test
public void testEmpty() throws Exception {
SorterTestUtils.testEmpty(
ExternalSorter.create(
- new ExternalSorter.Options().setTempLocation(tmpLocation.toString())));
+ new ExternalSorter.Options()
+ .setTempLocation(tmpLocation.toString())
+ .setSorterType(sorterType)));
}
@Test
public void testSingleElement() throws Exception {
SorterTestUtils.testSingleElement(
ExternalSorter.create(
- new ExternalSorter.Options().setTempLocation(tmpLocation.toString())));
+ new ExternalSorter.Options()
+ .setTempLocation(tmpLocation.toString())
+ .setSorterType(sorterType)));
}
@Test
public void testEmptyKeyValueElement() throws Exception {
SorterTestUtils.testEmptyKeyValueElement(
ExternalSorter.create(
- new ExternalSorter.Options().setTempLocation(tmpLocation.toString())));
+ new ExternalSorter.Options()
+ .setTempLocation(tmpLocation.toString())
+ .setSorterType(sorterType)));
}
@Test
public void testMultipleIterations() throws Exception {
SorterTestUtils.testMultipleIterations(
ExternalSorter.create(
- new ExternalSorter.Options().setTempLocation(tmpLocation.toString())));
+ new ExternalSorter.Options()
+ .setTempLocation(tmpLocation.toString())
+ .setSorterType(sorterType)));
}
@Test
@@ -97,7 +121,9 @@ public class ExternalSorterTest {
SorterTestUtils.testRandom(
() ->
ExternalSorter.create(
- new ExternalSorter.Options().setTempLocation(tmpLocation.toString())),
+ new ExternalSorter.Options()
+ .setTempLocation(tmpLocation.toString())
+ .setSorterType(sorterType)),
1,
1000000);
}
@@ -105,7 +131,10 @@ public class ExternalSorterTest {
@Test
public void testAddAfterSort() throws Exception {
SorterTestUtils.testAddAfterSort(
- ExternalSorter.create(new ExternalSorter.Options().setTempLocation(tmpLocation.toString())),
+ ExternalSorter.create(
+ new ExternalSorter.Options()
+ .setTempLocation(tmpLocation.toString())
+ .setSorterType(sorterType)),
thrown);
fail();
}
@@ -113,7 +142,10 @@ public class ExternalSorterTest {
@Test
public void testSortTwice() throws Exception {
SorterTestUtils.testSortTwice(
- ExternalSorter.create(new ExternalSorter.Options().setTempLocation(tmpLocation.toString())),
+ ExternalSorter.create(
+ new ExternalSorter.Options()
+ .setTempLocation(tmpLocation.toString())
+ .setSorterType(sorterType)),
thrown);
fail();
}
@@ -122,7 +154,7 @@ public class ExternalSorterTest {
public void testNegativeMemory() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be greater than zero");
- ExternalSorter.Options options = new ExternalSorter.Options();
+ ExternalSorter.Options options = new ExternalSorter.Options().setSorterType(sorterType);
options.setMemoryMB(-1);
}
@@ -130,7 +162,7 @@ public class ExternalSorterTest {
public void testZeroMemory() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be greater than zero");
- ExternalSorter.Options options = new ExternalSorter.Options();
+ ExternalSorter.Options options = new ExternalSorter.Options().setSorterType(sorterType);
options.setMemoryMB(0);
}
@@ -138,7 +170,7 @@ public class ExternalSorterTest {
public void testMemoryTooLarge() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be less than 2048");
- ExternalSorter.Options options = new ExternalSorter.Options();
+ ExternalSorter.Options options = new ExternalSorter.Options().setSorterType(sorterType);
options.setMemoryMB(2048);
}
}