You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2015/07/03 00:20:39 UTC

hive git commit: HIVE-10191: Remove per-row synchronization from ORC WriterImpl (Gopal V, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/branch-1 0dece6f37 -> 693ccf5d8


HIVE-10191: Remove per-row synchronization from ORC WriterImpl (Gopal V, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/693ccf5d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/693ccf5d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/693ccf5d

Branch: refs/heads/branch-1
Commit: 693ccf5d829311748c239620a6acca42a61cfb30
Parents: 0dece6f
Author: Gopal V <go...@apache.org>
Authored: Thu Jul 2 15:18:36 2015 -0700
Committer: Gopal V <go...@apache.org>
Committed: Thu Jul 2 15:18:36 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/MemoryManager.java    | 39 ++++++++++++++---
 .../apache/hadoop/hive/ql/io/orc/OrcFile.java   | 15 ++++---
 .../hadoop/hive/ql/io/orc/WriterImpl.java       | 46 ++++++++++----------
 3 files changed, 66 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/693ccf5d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
index 821bd35..6432d6e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
@@ -24,10 +24,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 
+import com.google.common.base.Preconditions;
+
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Implements a memory manager that keeps a global context of how many ORC
@@ -35,9 +38,9 @@ import java.util.Map;
  * dynamic partitions, it is easy to end up with many writers in the same task.
  * By managing the size of each allocation, we try to cut down the size of each
  * allocation and keep the task from running out of memory.
- *
- * This class is thread safe and uses synchronization around the shared state
- * to prevent race conditions.
+ * 
+ * This class is not thread safe, but is re-entrant - ensure creation and all
+ * invocations are triggered from the same thread.
  */
 class MemoryManager {
 
@@ -54,6 +57,14 @@ class MemoryManager {
   private long totalAllocation = 0;
   private double currentScale = 1;
   private int rowsAddedSinceCheck = 0;
+  private final OwnedLock ownerLock = new OwnedLock();
+
+  @SuppressWarnings("serial")
+  private static class OwnedLock extends ReentrantLock {
+    public Thread getOwner() {
+      return super.getOwner();
+    }
+  }
 
   private static class WriterInfo {
     long allocation;
@@ -84,6 +95,17 @@ class MemoryManager {
     double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal);
     totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
         getHeapMemoryUsage().getMax() * maxLoad);
+    ownerLock.lock();
+  }
+
+  /**
+   * Light weight thread-safety check for multi-threaded access patterns
+   */
+  private void checkOwner() {
+    Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(),
+        "Owner thread expected %s, got %s",
+        ownerLock.getOwner(),
+        Thread.currentThread());
   }
 
   /**
@@ -92,8 +114,9 @@ class MemoryManager {
    * @param path the file that is being written
    * @param requestedAllocation the requested buffer size
    */
-  synchronized void addWriter(Path path, long requestedAllocation,
+  void addWriter(Path path, long requestedAllocation,
                               Callback callback) throws IOException {
+    checkOwner();
     WriterInfo oldVal = writerList.get(path);
     // this should always be null, but we handle the case where the memory
     // manager wasn't told that a writer wasn't still in use and the task
@@ -115,7 +138,8 @@ class MemoryManager {
    * Remove the given writer from the pool.
    * @param path the file that has been closed
    */
-  synchronized void removeWriter(Path path) throws IOException {
+  void removeWriter(Path path) throws IOException {
+    checkOwner();
     WriterInfo val = writerList.get(path);
     if (val != null) {
       writerList.remove(path);
@@ -144,7 +168,7 @@ class MemoryManager {
    * @return a fraction between 0.0 and 1.0 of the requested size that is
    * available for each writer.
    */
-  synchronized double getAllocationScale() {
+  double getAllocationScale() {
     return currentScale;
   }
 
@@ -152,7 +176,7 @@ class MemoryManager {
    * Give the memory manager an opportunity for doing a memory check.
    * @throws IOException
    */
-  synchronized void addedRow() throws IOException {
+  void addedRow() throws IOException {
     if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
       notifyWriters();
     }
@@ -163,6 +187,7 @@ class MemoryManager {
    * @throws IOException
    */
   void notifyWriters() throws IOException {
+    checkOwner();
     LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
     for(WriterInfo writer: writerList.values()) {
       boolean flushed = writer.callback.checkMemory(currentScale);

http://git-wip-us.apache.org/repos/asf/hive/blob/693ccf5d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 61ee8b9..4e2bd6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -503,14 +503,19 @@ public final class OrcFile {
                         .rowIndexStride(rowIndexStride));
   }
 
-  private static MemoryManager memoryManager = null;
+  private static ThreadLocal<MemoryManager> memoryManager = null;
 
-  private static synchronized
-  MemoryManager getMemoryManager(Configuration conf) {
+  private static synchronized MemoryManager getMemoryManager(
+      final Configuration conf) {
     if (memoryManager == null) {
-      memoryManager = new MemoryManager(conf);
+      memoryManager = new ThreadLocal<MemoryManager>() {
+        @Override
+        protected MemoryManager initialValue() {
+          return new MemoryManager(conf);
+        }
+      };
     }
-    return memoryManager;
+    return memoryManager.get();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/693ccf5d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index ba5c831..58c7577 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -95,9 +95,14 @@ import com.google.protobuf.CodedOutputStream;
  * sub-types. Each of the TreeWriters writes the column's data as a set of
  * streams.
  *
- * This class is synchronized so that multi-threaded access is ok. In
- * particular, because the MemoryManager is shared between writers, this class
- * assumes that checkMemory may be called from a separate thread.
+ * This class is unsynchronized like most Stream objects, so from the creation of an OrcFile and all
+ * access to a single instance has to be from a single thread.
+ * 
+ * There are no known cases where these happen between different threads today.
+ * 
+ * Caveat: the MemoryManager is created during WriterOptions create, that has to be confined to a single
+ * thread as well.
+ * 
  */
 public class WriterImpl implements Writer, MemoryManager.Callback {
 
@@ -342,7 +347,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   @Override
-  public synchronized boolean checkMemory(double newScale) throws IOException {
+  public boolean checkMemory(double newScale) throws IOException {
     long limit = (long) Math.round(adjustedStripeSize * newScale);
     long size = estimateStripeSize();
     if (LOG.isDebugEnabled()) {
@@ -2407,21 +2412,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   @Override
-  public synchronized void addUserMetadata(String name, ByteBuffer value) {
+  public void addUserMetadata(String name, ByteBuffer value) {
     userMetadata.put(name, ByteString.copyFrom(value));
   }
 
   @Override
   public void addRow(Object row) throws IOException {
-    synchronized (this) {
-      treeWriter.write(row);
-      rowsInStripe += 1;
-      if (buildIndex) {
-        rowsInIndex += 1;
-
-        if (rowsInIndex >= rowIndexStride) {
-          createRowIndexEntry();
-        }
+    treeWriter.write(row);
+    rowsInStripe += 1;
+    if (buildIndex) {
+      rowsInIndex += 1;
+
+      if (rowsInIndex >= rowIndexStride) {
+        createRowIndexEntry();
       }
     }
     memoryManager.addedRow();
@@ -2435,13 +2438,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     // remove us from the memory manager so that we don't get any callbacks
     memoryManager.removeWriter(path);
     // actually close the file
-    synchronized (this) {
-      flushStripe();
-      int metadataLength = writeMetadata(rawWriter.getPos());
-      int footerLength = writeFooter(rawWriter.getPos() - metadataLength);
-      rawWriter.writeByte(writePostScript(footerLength, metadataLength));
-      rawWriter.close();
-    }
+    flushStripe();
+    int metadataLength = writeMetadata(rawWriter.getPos());
+    int footerLength = writeFooter(rawWriter.getPos() - metadataLength);
+    rawWriter.writeByte(writePostScript(footerLength, metadataLength));
+    rawWriter.close();
+
   }
 
   /**
@@ -2463,7 +2465,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   @Override
-  public synchronized long writeIntermediateFooter() throws IOException {
+  public long writeIntermediateFooter() throws IOException {
     // flush any buffered rows
     flushStripe();
     // write a footer