You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2015/07/03 00:20:39 UTC
hive git commit: HIVE-10191: Remove per-row synchronization from ORC
WriterImpl (Gopal V, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/branch-1 0dece6f37 -> 693ccf5d8
HIVE-10191: Remove per-row synchronization from ORC WriterImpl (Gopal V, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/693ccf5d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/693ccf5d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/693ccf5d
Branch: refs/heads/branch-1
Commit: 693ccf5d829311748c239620a6acca42a61cfb30
Parents: 0dece6f
Author: Gopal V <go...@apache.org>
Authored: Thu Jul 2 15:18:36 2015 -0700
Committer: Gopal V <go...@apache.org>
Committed: Thu Jul 2 15:18:36 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/io/orc/MemoryManager.java | 39 ++++++++++++++---
.../apache/hadoop/hive/ql/io/orc/OrcFile.java | 15 ++++---
.../hadoop/hive/ql/io/orc/WriterImpl.java | 46 ++++++++++----------
3 files changed, 66 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/693ccf5d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
index 821bd35..6432d6e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
@@ -24,10 +24,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import com.google.common.base.Preconditions;
+
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Implements a memory manager that keeps a global context of how many ORC
@@ -35,9 +38,9 @@ import java.util.Map;
* dynamic partitions, it is easy to end up with many writers in the same task.
* By managing the size of each allocation, we try to cut down the size of each
* allocation and keep the task from running out of memory.
- *
- * This class is thread safe and uses synchronization around the shared state
- * to prevent race conditions.
+ *
+ * This class is not thread safe, but is re-entrant - ensure creation and all
+ * invocations are triggered from the same thread.
*/
class MemoryManager {
@@ -54,6 +57,14 @@ class MemoryManager {
private long totalAllocation = 0;
private double currentScale = 1;
private int rowsAddedSinceCheck = 0;
+ private final OwnedLock ownerLock = new OwnedLock();
+
+ @SuppressWarnings("serial")
+ private static class OwnedLock extends ReentrantLock {
+ public Thread getOwner() {
+ return super.getOwner();
+ }
+ }
private static class WriterInfo {
long allocation;
@@ -84,6 +95,17 @@ class MemoryManager {
double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal);
totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
getHeapMemoryUsage().getMax() * maxLoad);
+ ownerLock.lock();
+ }
+
+ /**
+ * Light weight thread-safety check for multi-threaded access patterns
+ */
+ private void checkOwner() {
+ Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(),
+ "Owner thread expected %s, got %s",
+ ownerLock.getOwner(),
+ Thread.currentThread());
}
/**
@@ -92,8 +114,9 @@ class MemoryManager {
* @param path the file that is being written
* @param requestedAllocation the requested buffer size
*/
- synchronized void addWriter(Path path, long requestedAllocation,
+ void addWriter(Path path, long requestedAllocation,
Callback callback) throws IOException {
+ checkOwner();
WriterInfo oldVal = writerList.get(path);
// this should always be null, but we handle the case where the memory
// manager wasn't told that a writer wasn't still in use and the task
@@ -115,7 +138,8 @@ class MemoryManager {
* Remove the given writer from the pool.
* @param path the file that has been closed
*/
- synchronized void removeWriter(Path path) throws IOException {
+ void removeWriter(Path path) throws IOException {
+ checkOwner();
WriterInfo val = writerList.get(path);
if (val != null) {
writerList.remove(path);
@@ -144,7 +168,7 @@ class MemoryManager {
* @return a fraction between 0.0 and 1.0 of the requested size that is
* available for each writer.
*/
- synchronized double getAllocationScale() {
+ double getAllocationScale() {
return currentScale;
}
@@ -152,7 +176,7 @@ class MemoryManager {
* Give the memory manager an opportunity for doing a memory check.
* @throws IOException
*/
- synchronized void addedRow() throws IOException {
+ void addedRow() throws IOException {
if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
notifyWriters();
}
@@ -163,6 +187,7 @@ class MemoryManager {
* @throws IOException
*/
void notifyWriters() throws IOException {
+ checkOwner();
LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
for(WriterInfo writer: writerList.values()) {
boolean flushed = writer.callback.checkMemory(currentScale);
http://git-wip-us.apache.org/repos/asf/hive/blob/693ccf5d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 61ee8b9..4e2bd6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -503,14 +503,19 @@ public final class OrcFile {
.rowIndexStride(rowIndexStride));
}
- private static MemoryManager memoryManager = null;
+ private static ThreadLocal<MemoryManager> memoryManager = null;
- private static synchronized
- MemoryManager getMemoryManager(Configuration conf) {
+ private static synchronized MemoryManager getMemoryManager(
+ final Configuration conf) {
if (memoryManager == null) {
- memoryManager = new MemoryManager(conf);
+ memoryManager = new ThreadLocal<MemoryManager>() {
+ @Override
+ protected MemoryManager initialValue() {
+ return new MemoryManager(conf);
+ }
+ };
}
- return memoryManager;
+ return memoryManager.get();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/693ccf5d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index ba5c831..58c7577 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -95,9 +95,14 @@ import com.google.protobuf.CodedOutputStream;
* sub-types. Each of the TreeWriters writes the column's data as a set of
* streams.
*
- * This class is synchronized so that multi-threaded access is ok. In
- * particular, because the MemoryManager is shared between writers, this class
- * assumes that checkMemory may be called from a separate thread.
+ * This class is unsynchronized like most Stream objects, so from the creation of an OrcFile and all
+ * access to a single instance has to be from a single thread.
+ *
+ * There are no known cases where these happen between different threads today.
+ *
+ * Caveat: the MemoryManager is created during WriterOptions create, that has to be confined to a single
+ * thread as well.
+ *
*/
public class WriterImpl implements Writer, MemoryManager.Callback {
@@ -342,7 +347,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- public synchronized boolean checkMemory(double newScale) throws IOException {
+ public boolean checkMemory(double newScale) throws IOException {
long limit = (long) Math.round(adjustedStripeSize * newScale);
long size = estimateStripeSize();
if (LOG.isDebugEnabled()) {
@@ -2407,21 +2412,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- public synchronized void addUserMetadata(String name, ByteBuffer value) {
+ public void addUserMetadata(String name, ByteBuffer value) {
userMetadata.put(name, ByteString.copyFrom(value));
}
@Override
public void addRow(Object row) throws IOException {
- synchronized (this) {
- treeWriter.write(row);
- rowsInStripe += 1;
- if (buildIndex) {
- rowsInIndex += 1;
-
- if (rowsInIndex >= rowIndexStride) {
- createRowIndexEntry();
- }
+ treeWriter.write(row);
+ rowsInStripe += 1;
+ if (buildIndex) {
+ rowsInIndex += 1;
+
+ if (rowsInIndex >= rowIndexStride) {
+ createRowIndexEntry();
}
}
memoryManager.addedRow();
@@ -2435,13 +2438,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
// remove us from the memory manager so that we don't get any callbacks
memoryManager.removeWriter(path);
// actually close the file
- synchronized (this) {
- flushStripe();
- int metadataLength = writeMetadata(rawWriter.getPos());
- int footerLength = writeFooter(rawWriter.getPos() - metadataLength);
- rawWriter.writeByte(writePostScript(footerLength, metadataLength));
- rawWriter.close();
- }
+ flushStripe();
+ int metadataLength = writeMetadata(rawWriter.getPos());
+ int footerLength = writeFooter(rawWriter.getPos() - metadataLength);
+ rawWriter.writeByte(writePostScript(footerLength, metadataLength));
+ rawWriter.close();
+
}
/**
@@ -2463,7 +2465,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- public synchronized long writeIntermediateFooter() throws IOException {
+ public long writeIntermediateFooter() throws IOException {
// flush any buffered rows
flushStripe();
// write a footer