You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by gl...@apache.org on 2019/06/13 09:10:44 UTC

[beam] branch master updated: Merge pull request #8552: [BEAM-7268] make sorter extension Hadoop-free

This is an automated email from the ASF dual-hosted git repository.

gleb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 999bc5a  Merge pull request #8552: [BEAM-7268] make sorter extension Hadoop-free
999bc5a is described below

commit 999bc5af4ed67fc68e50ec17446cc6e7c0b0cf7d
Author: Neville Li <ne...@spotify.com>
AuthorDate: Thu Jun 13 05:10:16 2019 -0400

    Merge pull request #8552: [BEAM-7268] make sorter extension Hadoop-free
---
 .../extensions/sorter/BufferedExternalSorter.java  |  22 +-
 .../beam/sdk/extensions/sorter/ExternalSorter.java | 196 ++------------
 ...ternalSorter.java => HadoopExternalSorter.java} |  52 +---
 .../extensions/sorter/NativeExternalSorter.java    |  77 ++++++
 .../sdk/extensions/sorter/NativeFileSorter.java    | 283 +++++++++++++++++++++
 .../extensions/sorter/ExternalSorterBenchmark.java |  62 +++++
 .../sdk/extensions/sorter/ExternalSorterTest.java  |  56 +++-
 7 files changed, 514 insertions(+), 234 deletions(-)

diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
index 6032b00..4a60a99 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
@@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 
 import java.io.IOException;
 import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sorter.ExternalSorter.Options.SorterType;
 import org.apache.beam.sdk.values.KV;
 
 /**
@@ -29,17 +30,19 @@ import org.apache.beam.sdk.values.KV;
  */
 public class BufferedExternalSorter implements Sorter {
   public static Options options() {
-    return new Options("/tmp", 100);
+    return new Options("/tmp", 100, SorterType.HADOOP);
   }
 
   /** Contains configuration for the sorter. */
   public static class Options implements Serializable {
     private final String tempLocation;
     private final int memoryMB;
+    private final SorterType sorterType;
 
-    private Options(String tempLocation, int memoryMB) {
+    private Options(String tempLocation, int memoryMB, SorterType sorterType) {
       this.tempLocation = tempLocation;
       this.memoryMB = memoryMB;
+      this.sorterType = sorterType;
     }
 
     /** Sets the path to a temporary location where the sorter writes intermediate files. */
@@ -48,7 +51,7 @@ public class BufferedExternalSorter implements Sorter {
           !tempLocation.startsWith("gs://"),
           "BufferedExternalSorter does not support GCS temporary location");
 
-      return new Options(tempLocation, memoryMB);
+      return new Options(tempLocation, memoryMB, sorterType);
     }
 
     /** Returns the configured temporary location. */
@@ -66,13 +69,23 @@ public class BufferedExternalSorter implements Sorter {
       // Hadoop's external sort stores the number of available memory bytes in an int, this prevents
       // overflow
       checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
-      return new Options(tempLocation, memoryMB);
+      return new Options(tempLocation, memoryMB, sorterType);
     }
 
     /** Returns the configured size of the memory buffer. */
     public int getMemoryMB() {
       return memoryMB;
     }
+
+    /** Sets the external sorter type. */
+    public Options withExternalSorterType(SorterType sorterType) {
+      return new Options(tempLocation, memoryMB, sorterType);
+    }
+
+    /** Returns the external sorter type. */
+    public SorterType getExternalSorterType() {
+      return sorterType;
+    }
   }
 
   private final ExternalSorter externalSorter;
@@ -89,6 +102,7 @@ public class BufferedExternalSorter implements Sorter {
     ExternalSorter.Options externalSorterOptions = new ExternalSorter.Options();
     externalSorterOptions.setMemoryMB(options.getMemoryMB());
     externalSorterOptions.setTempLocation(options.getTempLocation());
+    externalSorterOptions.setSorterType(options.getExternalSorterType());
 
     InMemorySorter.Options inMemorySorterOptions = new InMemorySorter.Options();
     inMemorySorterOptions.setMemoryMB(options.getMemoryMB());
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
index 192b8ca..576d1fa 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
@@ -18,53 +18,24 @@
 package org.apache.beam.sdk.extensions.sorter;
 
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-import javax.annotation.Nonnull;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.mapred.JobConf;
 
-/** Does an external sort of the provided values using Hadoop's {@link SequenceFile}. */
-class ExternalSorter implements Sorter {
-  private final Options options;
-
-  /** Whether {@link #sort()} was already called. */
-  private boolean sortCalled = false;
-
-  /** SequenceFile Writer for writing all input data to a file. */
-  private Writer writer;
-
-  /** Sorter used to sort the input file. */
-  private SequenceFile.Sorter sorter;
-
-  /** Temporary directory for input and intermediate files. */
-  private Path tempDir;
-
-  /** The list of input files to be sorted. */
-  private Path[] paths;
-
-  private boolean initialized = false;
+/** Does an external sort of the provided values. */
+public abstract class ExternalSorter implements Sorter {
+  protected final Options options;
 
   /** {@link Options} contains configuration of the sorter. */
   public static class Options implements Serializable {
     private String tempLocation = "/tmp";
     private int memoryMB = 100;
+    private SorterType sorterType = SorterType.HADOOP;
+
+    /** Sorter type. */
+    public enum SorterType {
+      HADOOP,
+      NATIVE
+    }
 
     /** Sets the path to a temporary location where the sorter writes intermediate files. */
     public Options setTempLocation(String tempLocation) {
@@ -98,144 +69,27 @@ class ExternalSorter implements Sorter {
     public int getMemoryMB() {
       return memoryMB;
     }
-  }
-
-  /** Returns a {@link Sorter} configured with the given {@link Options}. */
-  public static ExternalSorter create(Options options) {
-    return new ExternalSorter(options);
-  }
-
-  @Override
-  public void add(KV<byte[], byte[]> record) throws IOException {
-    checkState(!sortCalled, "Records can only be added before sort()");
-
-    initHadoopSorter();
-
-    BytesWritable key = new BytesWritable(record.getKey());
-    BytesWritable value = new BytesWritable(record.getValue());
-
-    writer.append(key, value);
-  }
-
-  @Override
-  public Iterable<KV<byte[], byte[]>> sort() throws IOException {
-    checkState(!sortCalled, "sort() can only be called once.");
-    sortCalled = true;
-
-    initHadoopSorter();
-
-    writer.close();
-
-    return new SortedRecordsIterable();
-  }
-
-  private ExternalSorter(Options options) {
-    this.options = options;
-  }
-
-  /**
-   * Initializes the hadoop sorter. Does some local file system setup, and is somewhat expensive
-   * (~20 ms on local machine). Only executed when necessary.
-   */
-  private void initHadoopSorter() throws IOException {
-    if (!initialized) {
-      tempDir = new Path(options.getTempLocation(), "tmp" + UUID.randomUUID().toString());
-      paths = new Path[] {new Path(tempDir, "test.seq")};
 
-      JobConf conf = new JobConf();
-      // Sets directory for intermediate files created during merge of merge sort
-      conf.set("io.seqfile.local.dir", tempDir.toUri().getPath());
-
-      writer =
-          SequenceFile.createWriter(
-              conf,
-              Writer.valueClass(BytesWritable.class),
-              Writer.keyClass(BytesWritable.class),
-              Writer.file(paths[0]),
-              Writer.compression(CompressionType.NONE));
-
-      FileSystem fs = FileSystem.getLocal(conf);
-      // Directory has to exist for Hadoop to recognize it as deletable on exit
-      fs.mkdirs(tempDir);
-      fs.deleteOnExit(tempDir);
-
-      sorter =
-          new SequenceFile.Sorter(
-              fs, new BytesWritable.Comparator(), BytesWritable.class, BytesWritable.class, conf);
-      sorter.setMemory(options.getMemoryMB() * 1024 * 1024);
-
-      initialized = true;
+    /** Sets the sorter type. */
+    public Options setSorterType(SorterType sorterType) {
+      this.sorterType = sorterType;
+      return this;
     }
-  }
 
-  /** An {@link Iterable} producing the iterators over sorted data. */
-  private class SortedRecordsIterable implements Iterable<KV<byte[], byte[]>> {
-    @Nonnull
-    @Override
-    public Iterator<KV<byte[], byte[]>> iterator() {
-      return new SortedRecordsIterator();
+    /** Returns the sorter type. */
+    public SorterType getSorterType() {
+      return sorterType;
     }
   }
 
-  /** An {@link Iterator} producing the sorted data. */
-  private class SortedRecordsIterator implements Iterator<KV<byte[], byte[]>> {
-    private RawKeyValueIterator iterator;
-
-    /** Next {@link KV} to return from {@link #next()}. */
-    private KV<byte[], byte[]> nextKV;
-
-    SortedRecordsIterator() {
-      try {
-        this.iterator = sorter.sortAndIterate(paths, tempDir, false);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-
-      nextKV = KV.of(null, null); // A dummy value that will be overwritten by next().
-      next();
-    }
-
-    @Override
-    public boolean hasNext() {
-      return nextKV != null;
-    }
-
-    @Override
-    public KV<byte[], byte[]> next() {
-      if (nextKV == null) {
-        throw new NoSuchElementException();
-      }
-
-      KV<byte[], byte[]> current = nextKV;
-
-      try {
-        if (iterator.next()) {
-          // Parse key from DataOutputBuffer.
-          ByteArrayInputStream keyStream = new ByteArrayInputStream(iterator.getKey().getData());
-          BytesWritable key = new BytesWritable();
-          key.readFields(new DataInputStream(keyStream));
-
-          // Parse value from ValueBytes.
-          ByteArrayOutputStream valOutStream = new ByteArrayOutputStream();
-          iterator.getValue().writeUncompressedBytes(new DataOutputStream(valOutStream));
-          ByteArrayInputStream valInStream = new ByteArrayInputStream(valOutStream.toByteArray());
-          BytesWritable value = new BytesWritable();
-          value.readFields(new DataInputStream(valInStream));
-
-          nextKV = KV.of(key.copyBytes(), value.copyBytes());
-        } else {
-          nextKV = null;
-        }
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-
-      return current;
-    }
+  /** Returns a {@link Sorter} configured with the given {@link Options}. */
+  public static ExternalSorter create(Options options) {
+    return options.getSorterType() == Options.SorterType.HADOOP
+        ? HadoopExternalSorter.create(options)
+        : NativeExternalSorter.create(options);
+  }
 
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("Iterator does not support remove");
-    }
+  ExternalSorter(Options options) {
+    this.options = options;
   }
 }
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/HadoopExternalSorter.java
similarity index 79%
copy from sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
copy to sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/HadoopExternalSorter.java
index 192b8ca..c692bf4 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/HadoopExternalSorter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.extensions.sorter;
 
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 
 import java.io.ByteArrayInputStream;
@@ -25,7 +24,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.UUID;
@@ -41,8 +39,7 @@ import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
 
 /** Does an external sort of the provided values using Hadoop's {@link SequenceFile}. */
-class ExternalSorter implements Sorter {
-  private final Options options;
+class HadoopExternalSorter extends ExternalSorter {
 
   /** Whether {@link #sort()} was already called. */
   private boolean sortCalled = false;
@@ -61,48 +58,9 @@ class ExternalSorter implements Sorter {
 
   private boolean initialized = false;
 
-  /** {@link Options} contains configuration of the sorter. */
-  public static class Options implements Serializable {
-    private String tempLocation = "/tmp";
-    private int memoryMB = 100;
-
-    /** Sets the path to a temporary location where the sorter writes intermediate files. */
-    public Options setTempLocation(String tempLocation) {
-      if (tempLocation.startsWith("gs://")) {
-        throw new IllegalArgumentException("Sorter doesn't support GCS temporary location.");
-      }
-
-      this.tempLocation = tempLocation;
-      return this;
-    }
-
-    /** Returns the configured temporary location. */
-    public String getTempLocation() {
-      return tempLocation;
-    }
-
-    /**
-     * Sets the size of the memory buffer in megabytes. Must be greater than zero and less than
-     * 2048.
-     */
-    public Options setMemoryMB(int memoryMB) {
-      checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
-      // Hadoop's external sort stores the number of available memory bytes in an int, this prevents
-      // integer overflow
-      checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
-      this.memoryMB = memoryMB;
-      return this;
-    }
-
-    /** Returns the configured size of the memory buffer. */
-    public int getMemoryMB() {
-      return memoryMB;
-    }
-  }
-
   /** Returns a {@link Sorter} configured with the given {@link Options}. */
-  public static ExternalSorter create(Options options) {
-    return new ExternalSorter(options);
+  public static HadoopExternalSorter create(Options options) {
+    return new HadoopExternalSorter(options);
   }
 
   @Override
@@ -129,8 +87,8 @@ class ExternalSorter implements Sorter {
     return new SortedRecordsIterable();
   }
 
-  private ExternalSorter(Options options) {
-    this.options = options;
+  private HadoopExternalSorter(Options options) {
+    super(options);
   }
 
   /**
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeExternalSorter.java
new file mode 100644
index 0000000..812a96d
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeExternalSorter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sorter;
+
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import org.apache.beam.sdk.values.KV;
+
+/** Does an external sort of the provided values. */
+class NativeExternalSorter extends ExternalSorter {
+
+  /** Whether {@link #sort()} was already called. */
+  private boolean sortCalled = false;
+
+  /** Sorter used to sort the input. */
+  private NativeFileSorter sorter;
+
+  private boolean initialized = false;
+
+  /** Returns a {@link Sorter} configured with the given {@link Options}. */
+  public static NativeExternalSorter create(Options options) {
+    return new NativeExternalSorter(options);
+  }
+
+  @Override
+  public void add(KV<byte[], byte[]> record) throws IOException {
+    checkState(!sortCalled, "Records can only be added before sort()");
+
+    initSorter();
+
+    sorter.add(record.getKey(), record.getValue());
+  }
+
+  @Override
+  public Iterable<KV<byte[], byte[]>> sort() throws IOException {
+    checkState(!sortCalled, "sort() can only be called once.");
+    sortCalled = true;
+
+    initSorter();
+
+    return sorter.sort();
+  }
+
+  private NativeExternalSorter(Options options) {
+    super(options);
+  }
+
+  /**
+   * Initializes the sorter. Does some local file system setup, and is somewhat expensive (~20 ms on
+   * local machine). Only executed when necessary.
+   */
+  private void initSorter() throws IOException {
+    if (!initialized) {
+      sorter =
+          new NativeFileSorter(
+              Paths.get(options.getTempLocation()), (long) options.getMemoryMB() * 1024 * 1024);
+      initialized = true;
+    }
+  }
+}
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeFileSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeFileSorter.java
new file mode 100644
index 0000000..4d6ff32
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/NativeFileSorter.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sorter;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.UnsignedBytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * External Sorter based on <a
+ * href="https://github.com/lemire/externalsortinginjava">lemire/externalsortinginjava</a>.
+ */
+class NativeFileSorter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(NativeFileSorter.class);
+
+  private static final int MAX_TEMP_FILES = 1024;
+  private static final long OBJECT_OVERHEAD = getObjectOverhead();
+
+  private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
+  private static final Comparator<KV<byte[], byte[]>> KV_COMPARATOR =
+      (x, y) -> COMPARATOR.compare(x.getKey(), y.getKey());
+  private static final ByteArrayCoder CODER = ByteArrayCoder.of();
+
+  private final Path tempDir;
+  private final long maxMemory;
+  private final File dataFile;
+  private final OutputStream dataStream;
+
+  private boolean sortCalled = false;
+
+  /** Create a new file sorter. */
+  public NativeFileSorter(Path tempDir, long maxMemory) throws IOException {
+    this.tempDir = tempDir;
+    this.maxMemory = maxMemory;
+
+    this.dataFile = Files.createTempFile(tempDir, "input", "seq").toFile();
+    this.dataStream = new BufferedOutputStream(new FileOutputStream(dataFile));
+    dataFile.deleteOnExit();
+
+    LOG.debug("Created input file {}", dataFile);
+  }
+
+  /**
+   * Adds a given record to the sorter.
+   *
+   * <p>Records can only be added before calling {@link #sort()}.
+   */
+  public void add(byte[] key, byte[] value) throws IOException {
+    Preconditions.checkState(!sortCalled, "Records can only be added before sort()");
+    CODER.encode(key, dataStream);
+    CODER.encode(value, dataStream);
+  }
+
+  /**
+   * Sorts the added elements and returns an {@link Iterable} over the sorted elements.
+   *
+   * <p>Can be called at most once.
+   */
+  public Iterable<KV<byte[], byte[]>> sort() throws IOException {
+    Preconditions.checkState(!sortCalled, "sort() can only be called once.");
+    sortCalled = true;
+
+    dataStream.close();
+
+    return mergeSortedFiles(sortInBatch());
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Loads the file by blocks of records, sorts in memory, and writes the result to temporary files
+   * that have to be merged later.
+   */
+  private List<File> sortInBatch() throws IOException {
+    final long fileSize = Files.size(dataFile.toPath());
+    final long memory = maxMemory > 0 ? maxMemory : estimateAvailableMemory();
+    final long blockSize = estimateBestBlockSize(fileSize, memory); // in bytes
+    LOG.debug(
+        "Sort in batch with fileSize: {}, memory: {}, blockSize: {}", fileSize, memory, blockSize);
+
+    final List<File> files = new ArrayList<>();
+    InputStream inputStream = new BufferedInputStream(new FileInputStream(dataFile));
+    try {
+      final List<KV<byte[], byte[]>> tempList = new ArrayList<>();
+      KV<byte[], byte[]> kv = KV.of(null, null);
+      while (kv != null) {
+        long currentBlockSize = 0;
+        while ((currentBlockSize < blockSize) && (kv = readKeyValue(inputStream)) != null) {
+          // as long as you have enough memory
+          tempList.add(kv);
+          currentBlockSize += estimateSizeOf(kv);
+        }
+        files.add(sortAndSave(tempList));
+        tempList.clear();
+      }
+    } finally {
+      inputStream.close();
+    }
+    return files;
+  }
+
+  /** Sort a list and save it to a temporary file. */
+  private File sortAndSave(List<KV<byte[], byte[]>> tempList) throws IOException {
+    final File tempFile = Files.createTempFile(tempDir, "sort", "seq").toFile();
+    tempFile.deleteOnExit();
+    LOG.debug("Sort and save {}", tempFile);
+
+    tempList.sort(KV_COMPARATOR);
+
+    OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
+    try {
+      for (KV<byte[], byte[]> kv : tempList) {
+        CODER.encode(kv.getKey(), outputStream);
+        CODER.encode(kv.getValue(), outputStream);
+      }
+    } finally {
+      outputStream.close();
+    }
+    return tempFile;
+  }
+
+  /** Merges a list of temporary flat files. */
+  private Iterable<KV<byte[], byte[]>> mergeSortedFiles(List<File> files) {
+    return () -> {
+      final List<Iterator<KV<byte[], byte[]>>> iterators = new ArrayList<>();
+      for (File file : files) {
+        try {
+          iterators.add(iterateFile(file));
+        } catch (FileNotFoundException e) {
+          throw new IllegalStateException(e);
+        }
+      }
+
+      return Iterators.mergeSorted(iterators, KV_COMPARATOR);
+    };
+  }
+
+  /** Creates an {@link Iterator} over the key-value pairs in a file. */
+  private Iterator<KV<byte[], byte[]>> iterateFile(File file) throws FileNotFoundException {
+    final InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
+    return new Iterator<KV<byte[], byte[]>>() {
+      KV<byte[], byte[]> nextKv = readNext();
+
+      @Override
+      public boolean hasNext() {
+        return nextKv != null;
+      }
+
+      @Override
+      public KV<byte[], byte[]> next() {
+        KV<byte[], byte[]> r = nextKv;
+        nextKv = readNext();
+        return r;
+      }
+
+      private KV<byte[], byte[]> readNext() {
+        try {
+          return readKeyValue(inputStream);
+        } catch (EOFException e) {
+          return null;
+        } catch (IOException e) {
+          throw new IllegalStateException(e);
+        }
+      }
+    };
+  }
+
+  /** Reads the next key-value pair from a file. */
+  private KV<byte[], byte[]> readKeyValue(InputStream inputStream) throws IOException {
+    try {
+      final byte[] keyBytes = CODER.decode(inputStream);
+      final byte[] valueBytes = CODER.decode(inputStream);
+      return KV.of(keyBytes, valueBytes);
+    } catch (EOFException e) {
+      return null;
+    }
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////
+
+  private int bufferSize(int numFiles) {
+    final long memory = maxMemory > 0 ? maxMemory : estimateAvailableMemory();
+    return (int) (memory / numFiles / 2);
+  }
+
+  /**
+   * This method calls the garbage collector and then returns the free memory. This avoids problems
+   * with applications where the GC hasn't reclaimed memory and reports no available memory.
+   */
+  @SuppressFBWarnings("DM_GC")
+  private static long estimateAvailableMemory() {
+    System.gc();
+    // http://stackoverflow.com/questions/12807797/java-get-available-memory
+    final Runtime r = Runtime.getRuntime();
+    final long allocatedMemory = r.totalMemory() - r.freeMemory();
+    return r.maxMemory() - allocatedMemory;
+  }
+
+  /**
+   * We divide the file into small blocks. If the blocks are too small, we shall create too many
+   * temporary files. If they are too big, we shall be using too much memory.
+   *
+   * @param sizeOfFile how much data (in bytes) can we expect
+   * @param maxMemory Maximum memory to use (in bytes)
+   */
+  private static long estimateBestBlockSize(final long sizeOfFile, final long maxMemory) {
+    // we don't want to open up much more than MAX_TEMP_FILES temporary files, better run out of
+    // memory first.
+    long blockSize = sizeOfFile / MAX_TEMP_FILES + (sizeOfFile % MAX_TEMP_FILES == 0 ? 0 : 1);
+
+    // on the other hand, we don't want to create many temporary files for naught. If blockSize is
+    // smaller than half the free memory, grow it.
+    if (blockSize < maxMemory / 2) {
+      blockSize = maxMemory / 2;
+    }
+    return blockSize;
+  }
+
+  private static long getObjectOverhead() {
+    // By default we assume 64 bit JVM
+    // (defensive approach since we will get larger estimations in case we are not sure)
+    boolean is64BitJvm = true;
+    // check the system property "sun.arch.data.model"
+    // not very safe, as it might not work for all JVM implementations
+    // nevertheless the worst thing that might happen is that the JVM is 32bit
+    // but we assume its 64bit, so we will be counting a few extra bytes per string object
+    // no harm done here since this is just an approximation.
+    String arch = System.getProperty("sun.arch.data.model");
+    if (arch != null && arch.contains("32")) {
+      // If exists and is 32 bit then we assume a 32bit JVM
+      is64BitJvm = false;
+    }
+    // The sizes below are a bit rough as we don't take into account
+    // advanced JVM options such as compressed oops
+    // however if our calculation is not accurate it'll be a bit over
+    // so there is no danger of an out of memory error because of this.
+    long objectHeader = is64BitJvm ? 16 : 8;
+    long arrayHeader = is64BitJvm ? 24 : 12;
+    long objectRef = is64BitJvm ? 8 : 4;
+    return objectHeader + (objectRef + arrayHeader) * 2;
+  }
+
+  private static long estimateSizeOf(KV<byte[], byte[]> kv) {
+    return kv.getKey().length + kv.getValue().length + OBJECT_OVERHEAD;
+  }
+}
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterBenchmark.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterBenchmark.java
new file mode 100644
index 0000000..620711f
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterBenchmark.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sorter;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.UUID;
+import org.apache.beam.sdk.extensions.sorter.ExternalSorter.Options.SorterType;
+import org.apache.beam.sdk.values.KV;
+
+/** {@link ExternalSorter} benchmarks. */
+public class ExternalSorterBenchmark {
+  private static final int N = 1000 * 1000; // 1m * (36 * 2) ~= 72MB per 1 million KVs
+
+  public static void main(String[] args) throws IOException {
+    File tempDirectory = Files.createTempDirectory("sorter").toFile();
+    tempDirectory.deleteOnExit();
+
+    ExternalSorter.Options options =
+        new ExternalSorter.Options().setMemoryMB(32).setTempLocation(tempDirectory.toString());
+
+    options.setSorterType(SorterType.HADOOP);
+    benchmark(ExternalSorter.create(options));
+
+    options.setSorterType(SorterType.NATIVE);
+    benchmark(ExternalSorter.create(options));
+  }
+
+  private static void benchmark(Sorter sorter) throws IOException {
+    long start = System.currentTimeMillis();
+    for (int i = 0; i < N; i++) {
+      sorter.add(
+          KV.of(
+              UUID.randomUUID().toString().getBytes(Charset.defaultCharset()),
+              UUID.randomUUID().toString().getBytes(Charset.defaultCharset())));
+    }
+    int i = 0;
+    for (KV<byte[], byte[]> ignored : sorter.sort()) {
+      i++;
+    }
+    long end = System.currentTimeMillis();
+    System.out.println(
+        String.format("%s: %fs", sorter.getClass().getSimpleName(), (end - start) / 1000.0));
+  }
+}
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
index fb0a7b8..f3b391e 100644
--- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
@@ -25,20 +25,30 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.beam.sdk.extensions.sorter.ExternalSorter.Options.SorterType;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /** Tests for Sorter. */
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
 public class ExternalSorterTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
   private static Path tmpLocation;
 
+  public ExternalSorterTest(SorterType sorterType) {
+    this.sorterType = sorterType;
+  }
+
+  private final SorterType sorterType;
+
   @BeforeClass
   public static void setupTempDir() throws IOException {
     tmpLocation = Files.createTempDirectory("tmp");
@@ -64,32 +74,46 @@ public class ExternalSorterTest {
         });
   }
 
+  @Parameters
+  public static Collection<SorterType[]> data() {
+    return Arrays.asList(
+        new SorterType[] {SorterType.HADOOP}, new SorterType[] {SorterType.NATIVE});
+  }
+
   @Test
   public void testEmpty() throws Exception {
     SorterTestUtils.testEmpty(
         ExternalSorter.create(
-            new ExternalSorter.Options().setTempLocation(tmpLocation.toString())));
+            new ExternalSorter.Options()
+                .setTempLocation(tmpLocation.toString())
+                .setSorterType(sorterType)));
   }
 
   @Test
   public void testSingleElement() throws Exception {
     SorterTestUtils.testSingleElement(
         ExternalSorter.create(
-            new ExternalSorter.Options().setTempLocation(tmpLocation.toString())));
+            new ExternalSorter.Options()
+                .setTempLocation(tmpLocation.toString())
+                .setSorterType(sorterType)));
   }
 
   @Test
   public void testEmptyKeyValueElement() throws Exception {
     SorterTestUtils.testEmptyKeyValueElement(
         ExternalSorter.create(
-            new ExternalSorter.Options().setTempLocation(tmpLocation.toString())));
+            new ExternalSorter.Options()
+                .setTempLocation(tmpLocation.toString())
+                .setSorterType(sorterType)));
   }
 
   @Test
   public void testMultipleIterations() throws Exception {
     SorterTestUtils.testMultipleIterations(
         ExternalSorter.create(
-            new ExternalSorter.Options().setTempLocation(tmpLocation.toString())));
+            new ExternalSorter.Options()
+                .setTempLocation(tmpLocation.toString())
+                .setSorterType(sorterType)));
   }
 
   @Test
@@ -97,7 +121,9 @@ public class ExternalSorterTest {
     SorterTestUtils.testRandom(
         () ->
             ExternalSorter.create(
-                new ExternalSorter.Options().setTempLocation(tmpLocation.toString())),
+                new ExternalSorter.Options()
+                    .setTempLocation(tmpLocation.toString())
+                    .setSorterType(sorterType)),
         1,
         1000000);
   }
@@ -105,7 +131,10 @@ public class ExternalSorterTest {
   @Test
   public void testAddAfterSort() throws Exception {
     SorterTestUtils.testAddAfterSort(
-        ExternalSorter.create(new ExternalSorter.Options().setTempLocation(tmpLocation.toString())),
+        ExternalSorter.create(
+            new ExternalSorter.Options()
+                .setTempLocation(tmpLocation.toString())
+                .setSorterType(sorterType)),
         thrown);
     fail();
   }
@@ -113,7 +142,10 @@ public class ExternalSorterTest {
   @Test
   public void testSortTwice() throws Exception {
     SorterTestUtils.testSortTwice(
-        ExternalSorter.create(new ExternalSorter.Options().setTempLocation(tmpLocation.toString())),
+        ExternalSorter.create(
+            new ExternalSorter.Options()
+                .setTempLocation(tmpLocation.toString())
+                .setSorterType(sorterType)),
         thrown);
     fail();
   }
@@ -122,7 +154,7 @@ public class ExternalSorterTest {
   public void testNegativeMemory() {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("memoryMB must be greater than zero");
-    ExternalSorter.Options options = new ExternalSorter.Options();
+    ExternalSorter.Options options = new ExternalSorter.Options().setSorterType(sorterType);
     options.setMemoryMB(-1);
   }
 
@@ -130,7 +162,7 @@ public class ExternalSorterTest {
   public void testZeroMemory() {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("memoryMB must be greater than zero");
-    ExternalSorter.Options options = new ExternalSorter.Options();
+    ExternalSorter.Options options = new ExternalSorter.Options().setSorterType(sorterType);
     options.setMemoryMB(0);
   }
 
@@ -138,7 +170,7 @@ public class ExternalSorterTest {
   public void testMemoryTooLarge() {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("memoryMB must be less than 2048");
-    ExternalSorter.Options options = new ExternalSorter.Options();
+    ExternalSorter.Options options = new ExternalSorter.Options().setSorterType(sorterType);
     options.setMemoryMB(2048);
   }
 }