You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/21 01:05:38 UTC

svn commit: r1470249 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/test/org/apache/hadoop/hive/ql/io/orc/

Author: hashutosh
Date: Sat Apr 20 23:05:38 2013
New Revision: 1470249

URL: http://svn.apache.org/r1470249
Log:
HIVE-4248 : Implement a memory manager for ORC (Owen Omalley via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1470249&r1=1470248&r2=1470249&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Apr 20 23:05:38 2013
@@ -486,6 +486,9 @@ public class HiveConf extends Configurat
     HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true),
     HIVEUSERCFILESYNCCACHE("hive.exec.rcfile.use.sync.cache", true),
 
+    // Maximum fraction of heap that can be used by ORC file writers
+    HIVE_ORC_FILE_MEMORY_POOL("hive.exec.orc.memory.pool", 0.5f), // 50%
+
     HIVESKEWJOIN("hive.optimize.skewjoin", false),
     HIVECONVERTJOIN("hive.auto.convert.join", true),
     HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true),

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java?rev=1470249&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java Sat Apr 20 23:05:38 2013
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements a memory manager that keeps a global context of how many ORC
+ * writers there are and manages the memory between them. For use cases with
+ * dynamic partitions, it is easy to end up with many writers in the same task.
+ * By managing the size of each allocation, we try to cut down the size of each
+ * allocation and keep the task from running out of memory.
+ */
+class MemoryManager {
+  /**
+   * How much does the pool need to change between notifications?
+   */
+  private static final double NOTIFICATION_FACTOR = 1.1;
+  private final long totalMemoryPool;
+  private long notificationTrigger;
+  private final Map<Path, WriterInfo> writerList =
+      new HashMap<Path, WriterInfo>();
+  private long totalAllocation = 0;
+  private double currentScale = 1;
+  private double lastNotificationScale = 1;
+
+  private static class WriterInfo {
+    long allocation;
+    Callback callback;
+    WriterInfo(long allocation, Callback callback) {
+      this.allocation = allocation;
+      this.callback = callback;
+    }
+  }
+
+  public interface Callback {
+    void checkMemory(double newScale) throws IOException;
+  }
+
+  /**
+   * Create the memory manager.
+   * @param conf use the configuration to find the maximum size of the memory
+   *             pool.
+   */
+  MemoryManager(Configuration conf) {
+    HiveConf.ConfVars poolVar = HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL;
+    double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal);
+    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
+        getHeapMemoryUsage().getMax() * maxLoad);
+    notificationTrigger = Math.round(totalMemoryPool * NOTIFICATION_FACTOR);
+  }
+
+  /**
+   * Add a new writer's memory allocation to the pool
+   * @param path the file that is being written
+   * @param requestedAllocation the requested buffer size
+   */
+  synchronized void addWriter(Path path, long requestedAllocation,
+                              Callback callback) throws IOException {
+    WriterInfo oldVal = writerList.get(path);
+    if (oldVal == null) {
+      oldVal = new WriterInfo(requestedAllocation, callback);
+      writerList.put(path, oldVal);
+      totalAllocation += requestedAllocation;
+    } else {
+      totalAllocation += requestedAllocation - oldVal.allocation;
+      oldVal.allocation = requestedAllocation;
+      oldVal.callback = callback;
+    }
+    updateScale(true);
+  }
+
+  /**
+   * Remove the given writer from the pool.
+   * @param path the file that has been closed
+   */
+  synchronized void removeWriter(Path path) throws IOException {
+    WriterInfo val = writerList.get(path);
+    if (val != null) {
+      writerList.remove(path);
+      totalAllocation -= val.allocation;
+      updateScale(false);
+    }
+  }
+
+  /**
+   * Get the total pool size that is available for ORC writers.
+   * @return the number of bytes in the pool
+   */
+  long getTotalMemoryPool() {
+    return totalMemoryPool;
+  }
+
+  /**
+   * The scaling factor for each allocation to ensure that the pool isn't
+   * oversubscribed.
+   * @return a fraction between 0.0 and 1.0 of the requested size that is
+   * available for each writer.
+   */
+  synchronized double getAllocationScale() {
+    return currentScale;
+  }
+
+  /**
+   * Update the currentScale based on the current allocation and pool size.
+   * This also updates the notificationTrigger.
+   * @param isAllocate is this an allocation?
+   */
+  private void updateScale(boolean isAllocate) throws IOException {
+    if (totalAllocation <= totalMemoryPool) {
+      currentScale = 1;
+    } else {
+      currentScale = (double) totalMemoryPool / totalAllocation;
+    }
+    if (!isAllocate) {
+      // ensure that we notify if we drop 10% from the high water mark
+      notificationTrigger =
+          Math.min(notificationTrigger,
+              Math.round(totalMemoryPool * NOTIFICATION_FACTOR / currentScale));
+    } else {
+      // we've allocated a new writer, so check to see if we need to notify
+      if (totalAllocation > notificationTrigger) {
+        for(WriterInfo writer: writerList.values()) {
+          writer.callback.checkMemory(currentScale);
+        }
+        // set the next notification trigger
+        notificationTrigger =
+            Math.round(totalMemoryPool * NOTIFICATION_FACTOR / currentScale);
+      }
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1470249&r1=1470248&r2=1470249&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Sat Apr 20 23:05:38 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -70,13 +71,23 @@ public final class OrcFile {
    */
   public static Writer createWriter(FileSystem fs,
                                     Path path,
+                                    Configuration conf,
                                     ObjectInspector inspector,
                                     long stripeSize,
                                     CompressionKind compress,
                                     int bufferSize,
                                     int rowIndexStride) throws IOException {
     return new WriterImpl(fs, path, inspector, stripeSize, compress,
-      bufferSize, rowIndexStride);
+      bufferSize, rowIndexStride, getMemoryManager(conf));
   }
 
+  private static MemoryManager memoryManager = null;
+
+  private static synchronized
+  MemoryManager getMemoryManager(Configuration conf) {
+    if (memoryManager == null) {
+      memoryManager = new MemoryManager(conf);
+    }
+    return memoryManager;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1470249&r1=1470248&r2=1470249&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Sat Apr 20 23:05:38 2013
@@ -71,8 +71,8 @@ public class OrcOutputFormat extends Fil
     public void write(NullWritable nullWritable,
                       OrcSerdeRow row) throws IOException {
       if (writer == null) {
-        writer = OrcFile.createWriter(fs, path, row.getInspector(), stripeSize,
-          compress, compressionSize, rowIndexStride);
+        writer = OrcFile.createWriter(fs, path, this.conf, row.getInspector(),
+            stripeSize, compress, compressionSize, rowIndexStride);
       }
       writer.addRow(row.getRow());
     }
@@ -81,8 +81,9 @@ public class OrcOutputFormat extends Fil
     public void write(Writable row) throws IOException {
       OrcSerdeRow serdeRow = (OrcSerdeRow) row;
       if (writer == null) {
-        writer = OrcFile.createWriter(fs, path, serdeRow.getInspector(),
-            stripeSize, compress, compressionSize, rowIndexStride);
+        writer = OrcFile.createWriter(fs, path, this.conf,
+            serdeRow.getInspector(), stripeSize, compress, compressionSize,
+            rowIndexStride);
       }
       writer.addRow(serdeRow.getRow());
     }
@@ -101,8 +102,8 @@ public class OrcOutputFormat extends Fil
         ObjectInspector inspector = ObjectInspectorFactory.
             getStandardStructObjectInspector(new ArrayList<String>(),
                 new ArrayList<ObjectInspector>());
-        writer = OrcFile.createWriter(fs, path, inspector, stripeSize,
-            compress, compressionSize, rowIndexStride);
+        writer = OrcFile.createWriter(fs, path, this.conf, inspector,
+            stripeSize, compress, compressionSize, rowIndexStride);
       }
       writer.close();
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1470249&r1=1470248&r2=1470249&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Sat Apr 20 23:05:38 2013
@@ -63,7 +63,7 @@ import com.google.protobuf.CodedOutputSt
  * sub-types. Each of the TreeWriters writes the column's data as a set of
  * streams.
  */
-class WriterImpl implements Writer {
+class WriterImpl implements Writer, MemoryManager.Callback {
 
   private static final int HDFS_BUFFER_SIZE = 256 * 1024;
   private static final int MIN_ROW_INDEX_STRIDE = 1000;
@@ -98,6 +98,7 @@ class WriterImpl implements Writer {
   private final OrcProto.RowIndex.Builder rowIndex =
       OrcProto.RowIndex.newBuilder();
   private final boolean buildIndex;
+  private final MemoryManager memoryManager;
 
   WriterImpl(FileSystem fs,
              Path path,
@@ -105,13 +106,15 @@ class WriterImpl implements Writer {
              long stripeSize,
              CompressionKind compress,
              int bufferSize,
-             int rowIndexStride) throws IOException {
+             int rowIndexStride,
+             MemoryManager memoryManager) throws IOException {
     this.fs = fs;
     this.path = path;
     this.stripeSize = stripeSize;
     this.compress = compress;
     this.bufferSize = bufferSize;
     this.rowIndexStride = rowIndexStride;
+    this.memoryManager = memoryManager;
     buildIndex = rowIndexStride > 0;
     codec = createCodec(compress);
     treeWriter = createTreeWriter(inspector, streamFactory, false);
@@ -119,6 +122,8 @@ class WriterImpl implements Writer {
       throw new IllegalArgumentException("Row stride must be at least " +
           MIN_ROW_INDEX_STRIDE);
     }
+    // ensure that we are able to handle callbacks before we register ourselves
+    memoryManager.addWriter(path, stripeSize, this);
   }
 
   static CompressionCodec createCodec(CompressionKind kind) {
@@ -148,6 +153,13 @@ class WriterImpl implements Writer {
     }
   }
 
+  @Override
+  public void checkMemory(double newScale) throws IOException {
+    if (estimateStripeSize() > Math.round(stripeSize * newScale)) {
+     flushStripe();
+    }
+  }
+
   /**
    * This class is used to hold the contents of streams as they are buffered.
    * The TreeWriters write to the outStream and the codec compresses the
@@ -1445,8 +1457,8 @@ class WriterImpl implements Writer {
       }
     }
     // once every 1000 rows, check the size to see if we should spill
-    if (rowsInStripe % 1000 == 0 && estimateStripeSize() > stripeSize) {
-      flushStripe();
+    if (rowsInStripe % 1000 == 0) {
+      checkMemory(memoryManager.getAllocationScale());
     }
   }
 
@@ -1456,5 +1468,6 @@ class WriterImpl implements Writer {
     int footerLength = writeFooter(rawWriter.getPos());
     rawWriter.writeByte(writePostScript(footerLength));
     rawWriter.close();
+    memoryManager.removeWriter(path);
   }
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java?rev=1470249&r1=1470248&r2=1470249&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java Sat Apr 20 23:05:38 2013
@@ -94,7 +94,7 @@ public class TestFileDump {
       inspector = ObjectInspectorFactory.getReflectionObjectInspector
           (MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
-    Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         100000, CompressionKind.ZLIB, 10000, 10000);
     Random r1 = new Random(1);
     String[] words = new String[]{"It", "was", "the", "best", "of", "times,",

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java?rev=1470249&r1=1470248&r2=1470249&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java Sat Apr 20 23:05:38 2013
@@ -189,7 +189,7 @@ public class TestOrcFile {
       inspector = ObjectInspectorFactory.getReflectionObjectInspector
           (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
-    Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         100000, CompressionKind.ZLIB, 10000, 10000);
     writer.addRow(new BigRow(false, (byte) 1, (short) 1024, 65536,
         Long.MAX_VALUE, (float) 1.0, -15.0, bytes(0,1,2,3,4), "hi",
@@ -421,7 +421,7 @@ public class TestOrcFile {
           (InnerStruct.class,
               ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
-    Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         1000, CompressionKind.NONE, 100, 1000);
     Random r1 = new Random(1);
     Random r2 = new Random(2);
@@ -504,7 +504,7 @@ public class TestOrcFile {
       inspector = ObjectInspectorFactory.getReflectionObjectInspector
           (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
-    Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         1000, CompressionKind.NONE, 100, 10000);
     writer.close();
     Reader reader = OrcFile.createReader(fs, testFilePath);
@@ -524,7 +524,7 @@ public class TestOrcFile {
       inspector = ObjectInspectorFactory.getReflectionObjectInspector
           (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
-    Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         1000, CompressionKind.NONE, 100, 10000);
     writer.addUserMetadata("my.meta", byteBuf(1, 2, 3, 4, 5, 6, 7, -1, -2, 127, -128));
     writer.addUserMetadata("clobber", byteBuf(1,2,3));
@@ -590,7 +590,7 @@ public class TestOrcFile {
       inspector = OrcStruct.createObjectInspector(0, types);
     }
     HiveDecimal maxValue = new HiveDecimal("100000000000000000000");
-    Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         1000, CompressionKind.NONE, 100, 10000);
     OrcStruct row = new OrcStruct(3);
     OrcUnion union = new OrcUnion();
@@ -767,7 +767,7 @@ public class TestOrcFile {
           (InnerStruct.class,
               ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
-    Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         1000, CompressionKind.SNAPPY, 100, 10000);
     Random rand = new Random(12);
     for(int i=0; i < 10000; ++i) {
@@ -802,7 +802,7 @@ public class TestOrcFile {
           (InnerStruct.class,
               ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
-    Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         5000, CompressionKind.SNAPPY, 1000, 0);
     Random rand = new Random(24);
     for(int i=0; i < 10000; ++i) {
@@ -843,7 +843,7 @@ public class TestOrcFile {
       inspector = ObjectInspectorFactory.getReflectionObjectInspector
           (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
-    Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         200000, CompressionKind.ZLIB, 65536, 1000);
     Random rand = new Random(42);
     final int COUNT=32768;
@@ -936,4 +936,68 @@ public class TestOrcFile {
         (float) doubleValues[i], doubleValues[i], byteValues[i],stringValues[i],
         new MiddleStruct(inner, inner2), list(), map(inner,inner2));
   }
+
+  private static class MyMemoryManager extends MemoryManager {
+    final long totalSpace;
+    double rate;
+    Path path = null;
+    long lastAllocation = 0;
+
+    MyMemoryManager(Configuration conf, long totalSpace, double rate) {
+      super(conf);
+      this.totalSpace = totalSpace;
+      this.rate = rate;
+    }
+
+    @Override
+    void addWriter(Path path, long requestedAllocation,
+                   MemoryManager.Callback callback) {
+      this.path = path;
+      this.lastAllocation = requestedAllocation;
+    }
+
+    @Override
+    synchronized void removeWriter(Path path) {
+      this.path = null;
+      this.lastAllocation = 0;
+    }
+
+    @Override
+    long getTotalMemoryPool() {
+      return totalSpace;
+    }
+
+    @Override
+    double getAllocationScale() {
+      return rate;
+    }
+  }
+
+  @Test
+  public void testMemoryManagement() throws Exception {
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (InnerStruct.class,
+              ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+    MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
+    Writer writer = new WriterImpl(fs, testFilePath, inspector,
+        50000, CompressionKind.NONE, 100, 0, memory);
+    assertEquals(testFilePath, memory.path);
+    for(int i=0; i < 2500; ++i) {
+      writer.addRow(new InnerStruct(i*300, Integer.toHexString(10*i)));
+    }
+    writer.close();
+    assertEquals(null, memory.path);
+    Reader reader = OrcFile.createReader(fs, testFilePath);
+    int i = 0;
+    for(StripeInformation stripe: reader.getStripes()) {
+      i += 1;
+      assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(),
+          stripe.getDataLength() < 10000);
+    }
+    assertEquals(3, i);
+    assertEquals(2500, reader.getNumberOfRows());
+  }
 }