You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2019/12/27 07:48:57 UTC

[GitHub] [drill] paul-rogers commented on a change in pull request #1929: DRILL-6832: Remove the old "unmanaged" external sort

paul-rogers commented on a change in pull request #1929: DRILL-6832: Remove the old "unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361574025
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##########
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * <p>
+ * The batches are defined by a schema which can change over time. When the schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * <p>
+ * The <code>BatchGroup</code> operates in two modes as given by the two
+ * subclasses:
+ * <ul>
+ * <li>Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.</li>
+ * <li>Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.</li>
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, OperatorContext context) {
-    this.sv2 = sv2;
-    this.currentContainer = container;
-    this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * <ul>
+   * <li>Contains a single batch received from the upstream (input)
+   * operator.</li>
+   * <li>Associated selection vector that provides a sorted
+   * indirection to the values in the batch.</li>
+   * </ul>
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, OperatorContext context) {
-    currentContainer = container;
-    this.fs = fs;
-    this.path = new Path(path);
-    this.allocator = context.getAllocator();
-    this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+    private final SelectionVector2 sv2;
+    private final long dataSize;
+
+    public InputBatch(VectorContainer container, SelectionVector2 sv2, BufferAllocator allocator, long dataSize) {
+      super(container, allocator);
+      this.sv2 = sv2;
+      this.dataSize = dataSize;
+    }
 
-  public SelectionVector2 getSv2() {
-    return sv2;
+    public SelectionVector2 getSv2() { return sv2; }
+
+    public long getDataSize() { return dataSize; }
+
+    @Override
+    public int getRecordCount() {
+      if (sv2 != null) {
+        return sv2.getCount();
+      } else {
+        return super.getRecordCount();
+      }
+    }
+
+    @Override
+    public int getNextIndex() {
+      int val = super.getNextIndex();
+      if (val == -1) {
+        return val;
+      }
+      return sv2.getIndex(val);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (sv2 != null) {
+        sv2.clear();
+      }
+      super.close();
+    }
   }
 
   /**
-   * Updates the schema for this batch group. The current as well as any deserialized batches will be coerced to this schema
-   * @param schema
+   * Holds a set of spilled batches, represented by a file on disk.
+   * Handles reads from, and writes to the spill file. The data structure
+   * is:
+   * <ul>
+   * <li>A pointer to a file that contains serialized batches.</li>
+   * <li>When writing, each batch is appended to the output file.</li>
+   * <li>When reading, iterates over each spilled batch, and for each
+   * of those, each spilled record.</li>
+   * </ul>
+   * <p>
+   * Starts out with no current batch. Defines the current batch to be the
+   * (shell: schema without data) of the last batch spilled to disk.
+   * <p>
+   * When reading, has destructive read-once behavior: closing the
+   * batch (after reading) deletes the underlying spill file.
+   * <p>
+   * This single class does three tasks: load data, hold data and
+   * read data. This should be split into three separate classes. But,
+   * the original (combined) structure is retained for expedience at
+   * present.
    */
-  public void setSchema(BatchSchema schema) {
-    currentContainer = SchemaUtil.coerceContainer(currentContainer, schema, context);
-    this.schema = schema;
-  }
 
-  public void addBatch(VectorContainer newContainer) throws IOException {
-    assert fs != null;
-    assert path != null;
-    if (outputStream == null) {
-      outputStream = fs.create(path);
+  public static class SpilledRun extends BatchGroup {
+    private InputStream inputStream;
+    private String path;
+    private SpillSet spillSet;
+    private BufferAllocator allocator;
+    private int spilledBatches;
+    private long batchSize;
+    private Writer writer;
+    private VectorSerializer.Reader reader;
+
+    public SpilledRun(SpillSet spillSet, String path, BufferAllocator allocator) throws IOException {
+      super(null, allocator);
+      this.spillSet = spillSet;
+      this.path = path;
+      this.allocator = allocator;
+      writer = spillSet.writer(path);
     }
-    int recordCount = newContainer.getRecordCount();
-    WritableBatch batch = WritableBatch.getBatchNoHVWrap(recordCount, newContainer, false);
-    VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator);
-    Stopwatch watch = Stopwatch.createStarted();
-    outputBatch.writeToStream(outputStream);
-    newContainer.zeroVectors();
-    logger.debug("Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), recordCount);
-    spilledBatches++;
-  }
-
-  private VectorContainer getBatch() throws IOException {
-    assert fs != null;
-    assert path != null;
-    if (inputStream == null) {
-      inputStream = fs.open(path);
+
+    public void addBatch(VectorContainer newContainer) throws IOException {
+      writer.write(newContainer);
+      newContainer.zeroVectors();
+      logger.trace("Wrote {} records in {} us", newContainer.getRecordCount(), writer.time(TimeUnit.MICROSECONDS));
+      spilledBatches++;
+
+      // Hold onto the husk of the last added container so that we have a
+      // current container when starting to read rows back later.
+
+      currentContainer = newContainer;
+      currentContainer.setRecordCount(0);
     }
-    VectorAccessibleSerializable vas = new VectorAccessibleSerializable(allocator);
-    Stopwatch watch = Stopwatch.createStarted();
-    vas.readFromStream(inputStream);
-    VectorContainer c =  vas.get();
-    if (schema != null) {
-      c = SchemaUtil.coerceContainer(c, schema, context);
+
+    public void setBatchSize(long batchSize) {
+      this.batchSize = batchSize;
     }
-    logger.trace("Took {} us to read {} records", watch.elapsed(TimeUnit.MICROSECONDS), c.getRecordCount());
-    spilledBatches--;
-    currentContainer.zeroVectors();
-    Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
-    for (VectorWrapper<?> w : currentContainer) {
-      TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
-      pair.transfer();
+
+    public long getBatchSize() { return batchSize; }
+    public String getPath() { return path; }
+
+    @Override
+    public int getNextIndex() {
+      if (pointer == getRecordCount()) {
+        if (spilledBatches == 0) {
+          return -1;
+        }
+        try {
+          currentContainer.zeroVectors();
+          getBatch();
+        } catch (IOException e) {
+          // Release any partially-loaded data.
+          currentContainer.clear();
+          throw UserException.dataReadError(e)
+              .message("Failure while reading spilled data")
+              .build(logger);
+        }
+
+        // The pointer indicates the NEXT index, not the one we
+        // return here. At this point, we just started reading a
+        // new batch and have returned index 0. So, the next index
+        // is 1.
+
+        pointer = 1;
+        return 0;
+      }
+      return super.getNextIndex();
     }
-    currentContainer.setRecordCount(c.getRecordCount());
-    c.zeroVectors();
-    return c;
-  }
 
-  public int getNextIndex() {
-    int val;
-    if (pointer == getRecordCount()) {
-      if (spilledBatches == 0) {
-        return -1;
+    private VectorContainer getBatch() throws IOException {
+      if (inputStream == null) {
+        inputStream = spillSet.openForInput(path);
+        reader = VectorSerializer.reader(allocator, inputStream);
+      }
+      Stopwatch watch = Stopwatch.createStarted();
+      long start = allocator.getAllocatedMemory();
+      VectorContainer c =  reader.read();
+      long end = allocator.getAllocatedMemory();
+      logger.trace("Read {} records in {} us; size = {}, memory = {}",
+                   c.getRecordCount(),
+                   watch.elapsed(TimeUnit.MICROSECONDS),
+                   (end - start), end);
+      if (schema != null) {
+        c = SchemaUtil.coerceContainer(c, schema, allocator);
+      }
+      spilledBatches--;
+      currentContainer.zeroVectors();
+      Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
+      for (VectorWrapper<?> w : currentContainer) {
+        TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
+        pair.transfer();
+      }
+      currentContainer.setRecordCount(c.getRecordCount());
+      c.zeroVectors();
+      return c;
+    }
+
+    /**
+     * Close resources owned by this batch group. Each can fail; report
+     * only the first error. This is cluttered because this class tries
+     * to do multiple tasks. TODO: Split into multiple classes.
+     */
+
+    @Override
+    public void close() throws IOException {
+      IOException ex = null;
+      try {
+        super.close();
+      } catch (IOException e) {
+        ex = e;
       }
       try {
-        currentContainer.zeroVectors();
-        getBatch();
+        closeWriter();
       } catch (IOException e) {
-        throw new RuntimeException(e);
+        ex = ex == null ? e : ex;
       }
-      pointer = 1;
-      return 0;
+      try {
+        closeInputStream();
+      } catch (IOException e) {
+        ex = ex == null ? e : ex;
+      }
+      try {
+        spillSet.delete(path);
+      } catch (IOException e) {
+        ex = ex == null ? e : ex;
+      }
+      if (ex != null) {
+        throw ex;
+      }
+    }
+
+    private void closeInputStream() throws IOException {
+      if (inputStream == null) {
+        return;
+      }
+      long readLength = spillSet.getPosition(inputStream);
+      spillSet.tallyReadBytes(readLength);
+      inputStream.close();
+      inputStream = null;
+      reader = null;
+      logger.trace("Summary: Read {} bytes from {}", readLength, path);
     }
-    if (sv2 == null) {
-      val = pointer;
-      pointer++;
-      assert val < currentContainer.getRecordCount();
-    } else {
-      val = pointer;
-      pointer++;
-      assert val < currentContainer.getRecordCount();
-      val = sv2.getIndex(val);
+
+    public void closeWriter() throws IOException {
+      if (writer != null) {
+        spillSet.close(writer);
+        logger.trace("Summary: Wrote {} bytes in {} us to {}", writer.getBytesWritten(), writer.time(TimeUnit.MICROSECONDS), path);
+        writer = null;
+      }
     }
+  }
+
+  protected VectorContainer currentContainer;
+  protected int pointer = 0;
 
 Review comment:
   You are assuming that I know what it is used for. :-) I did some sleuthing and tidied this up a bit. Let me know if it makes more sense now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services