You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/10/08 22:49:57 UTC
[orc] branch master updated: ORC-361: Remove the single thread
restriction on the MemoryManagerImpl.
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new d657ed4 ORC-361: Remove the single thread restriction on the MemoryManagerImpl.
d657ed4 is described below
commit d657ed4f516d21e3b573c3ec4a4f31867a8ccf70
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Fri Oct 4 15:08:10 2019 -0700
ORC-361: Remove the single thread restriction on the MemoryManagerImpl.
Fixes #433
Signed-off-by: Owen O'Malley <om...@apache.org>
---
.../src/java/org/apache/orc/MemoryManager.java | 19 +++-
java/core/src/java/org/apache/orc/OrcFile.java | 15 +--
.../org/apache/orc/impl/MemoryManagerImpl.java | 122 +++++----------------
.../src/java/org/apache/orc/impl/WriterImpl.java | 45 +++++---
.../src/test/org/apache/orc/TestVectorOrcFile.java | 100 ++++-------------
.../org/apache/orc/impl/TestMemoryManager.java | 14 +--
6 files changed, 105 insertions(+), 210 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/MemoryManager.java b/java/core/src/java/org/apache/orc/MemoryManager.java
index 3afd3f5..258f381 100644
--- a/java/core/src/java/org/apache/orc/MemoryManager.java
+++ b/java/core/src/java/org/apache/orc/MemoryManager.java
@@ -36,10 +36,10 @@ public interface MemoryManager {
interface Callback {
/**
- * The writer needs to check its memory usage
+ * The scale factor for the stripe size has changed and thus the
+ * writer should adjust their desired size appropriately.
* @param newScale the current scale factor for memory allocations
* @return true if the writer was over the limit
- * @throws IOException
*/
boolean checkMemory(double newScale) throws IOException;
}
@@ -63,6 +63,21 @@ public interface MemoryManager {
* Give the memory manager an opportunity for doing a memory check.
* @param rows number of rows added
* @throws IOException
+ * @deprecated Use {@link MemoryManager#checkMemory} instead
*/
void addedRow(int rows) throws IOException;
+
+ /**
+ * As part of adding rows, the writer calls this method to determine
+ * if the scale factor has changed. If it has changed, the Callback will be
+ * called.
+ * @param previousAllocation the previous allocation
+ * @param writer the callback to call back into if we need to
+ * @return the current allocation
+ */
+ default long checkMemory(long previousAllocation,
+ Callback writer) throws IOException {
+ addedRow(1024);
+ return previousAllocation;
+ }
}
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index 803b609..e1dced0 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -942,19 +942,14 @@ public class OrcFile {
return new WriterOptions(tableProperties, conf);
}
- private static ThreadLocal<MemoryManager> memoryManager = null;
+ private static MemoryManager memoryManager = null;
- private static synchronized MemoryManager getStaticMemoryManager(
- final Configuration conf) {
+ private static synchronized
+ MemoryManager getStaticMemoryManager(Configuration conf) {
if (memoryManager == null) {
- memoryManager = new ThreadLocal<MemoryManager>() {
- @Override
- protected MemoryManager initialValue() {
- return new MemoryManagerImpl(conf);
- }
- };
+ memoryManager = new MemoryManagerImpl(conf);
}
- return memoryManager.get();
+ return memoryManager;
}
/**
diff --git a/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java b/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java
index ac589a0..4e78450 100644
--- a/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,8 +20,6 @@ package org.apache.orc.impl;
import org.apache.orc.MemoryManager;
import org.apache.orc.OrcConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -29,7 +27,7 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Implements a memory manager that keeps a global context of how many ORC
@@ -37,40 +35,20 @@ import java.util.concurrent.locks.ReentrantLock;
* dynamic partitions, it is easy to end up with many writers in the same task.
* By managing the size of each allocation, we try to cut down the size of each
* allocation and keep the task from running out of memory.
- *
+ *
* This class is not thread safe, but is re-entrant - ensure creation and all
* invocations are triggered from the same thread.
*/
public class MemoryManagerImpl implements MemoryManager {
- private static final Logger LOG = LoggerFactory.getLogger(MemoryManagerImpl.class);
-
- /**
- * How often should we check the memory sizes? Measured in rows added
- * to all of the writers.
- */
- final long ROWS_BETWEEN_CHECKS;
private final long totalMemoryPool;
- private final Map<Path, WriterInfo> writerList =
- new HashMap<Path, WriterInfo>();
- private long totalAllocation = 0;
- private double currentScale = 1;
- private int rowsAddedSinceCheck = 0;
- private final OwnedLock ownerLock = new OwnedLock();
-
- @SuppressWarnings("serial")
- private static class OwnedLock extends ReentrantLock {
- public Thread getOwner() {
- return super.getOwner();
- }
- }
+ private final Map<Path, WriterInfo> writerList = new HashMap<>();
+ private final AtomicLong totalAllocation = new AtomicLong(0);
private static class WriterInfo {
long allocation;
- Callback callback;
- WriterInfo(long allocation, Callback callback) {
+ WriterInfo(long allocation) {
this.allocation = allocation;
- this.callback = callback;
}
}
@@ -80,26 +58,16 @@ public class MemoryManagerImpl implements MemoryManager {
* pool.
*/
public MemoryManagerImpl(Configuration conf) {
- double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
- ROWS_BETWEEN_CHECKS = OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf);
- LOG.info(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute() + "=" + ROWS_BETWEEN_CHECKS);
- if(ROWS_BETWEEN_CHECKS < 1 || ROWS_BETWEEN_CHECKS > 10000) {
- throw new IllegalArgumentException(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute() + "="
- + ROWS_BETWEEN_CHECKS + " is outside valid range [1,10000].");
- }
- totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
- getHeapMemoryUsage().getMax() * maxLoad);
- ownerLock.lock();
+ this(Math.round(ManagementFactory.getMemoryMXBean().
+ getHeapMemoryUsage().getMax() * OrcConf.MEMORY_POOL.getDouble(conf)));
}
/**
- * Light weight thread-safety check for multi-threaded access patterns
+ * Create the memory manager
+ * @param poolSize the size of memory to use
*/
- private void checkOwner() {
- if (!ownerLock.isHeldByCurrentThread()) {
- LOG.warn("Owner thread expected {}, got {}",
- ownerLock.getOwner(), Thread.currentThread());
- }
+ public MemoryManagerImpl(long poolSize) {
+ totalMemoryPool = poolSize;
}
/**
@@ -108,43 +76,32 @@ public class MemoryManagerImpl implements MemoryManager {
* @param path the file that is being written
* @param requestedAllocation the requested buffer size
*/
- public void addWriter(Path path, long requestedAllocation,
+ public synchronized void addWriter(Path path, long requestedAllocation,
Callback callback) throws IOException {
- checkOwner();
WriterInfo oldVal = writerList.get(path);
// this should always be null, but we handle the case where the memory
// manager wasn't told that a writer wasn't still in use and the task
// starts writing to the same path.
if (oldVal == null) {
- oldVal = new WriterInfo(requestedAllocation, callback);
+ oldVal = new WriterInfo(requestedAllocation);
writerList.put(path, oldVal);
- totalAllocation += requestedAllocation;
+ totalAllocation.addAndGet(requestedAllocation);
} else {
// handle a new writer that is writing to the same path
- totalAllocation += requestedAllocation - oldVal.allocation;
+ totalAllocation.addAndGet(requestedAllocation - oldVal.allocation);
oldVal.allocation = requestedAllocation;
- oldVal.callback = callback;
}
- updateScale(true);
}
/**
* Remove the given writer from the pool.
* @param path the file that has been closed
*/
- public void removeWriter(Path path) throws IOException {
- checkOwner();
+ public synchronized void removeWriter(Path path) throws IOException {
WriterInfo val = writerList.get(path);
if (val != null) {
writerList.remove(path);
- totalAllocation -= val.allocation;
- if (writerList.isEmpty()) {
- rowsAddedSinceCheck = 0;
- }
- updateScale(false);
- }
- if(writerList.isEmpty()) {
- rowsAddedSinceCheck = 0;
+ totalAllocation.addAndGet(-val.allocation);
}
}
@@ -163,48 +120,29 @@ public class MemoryManagerImpl implements MemoryManager {
* available for each writer.
*/
public double getAllocationScale() {
- return currentScale;
+ long alloc = totalAllocation.get();
+ return alloc <= totalMemoryPool ? 1.0 : (double) totalMemoryPool / alloc;
}
- /**
- * Give the memory manager an opportunity for doing a memory check.
- * @param rows number of rows added
- * @throws IOException
- */
@Override
public void addedRow(int rows) throws IOException {
- rowsAddedSinceCheck += rows;
- if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
- notifyWriters();
- }
+ // PASS
}
/**
- * Notify all of the writers that they should check their memory usage.
- * @throws IOException
+ * Obsolete method left for Hive, which extends this class.
+ * @deprecated remove this method
*/
public void notifyWriters() throws IOException {
- checkOwner();
- LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
- for(WriterInfo writer: writerList.values()) {
- boolean flushed = writer.callback.checkMemory(currentScale);
- if (LOG.isDebugEnabled() && flushed) {
- LOG.debug("flushed " + writer.toString());
- }
- }
- rowsAddedSinceCheck = 0;
+ // PASS
}
- /**
- * Update the currentScale based on the current allocation and pool size.
- * This also updates the notificationTrigger.
- * @param isAllocate is this an allocation?
- */
- private void updateScale(boolean isAllocate) throws IOException {
- if (totalAllocation <= getTotalMemoryPool()) {
- currentScale = 1;
- } else {
- currentScale = (double) getTotalMemoryPool() / totalAllocation;
+ @Override
+ public long checkMemory(long previous, Callback writer) throws IOException {
+ long current = totalAllocation.get();
+ if (current != previous) {
+ writer.checkMemory(getAllocationScale());
}
+ return current;
}
}
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 9e65f2c..31a401a 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -87,7 +87,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
private static final int MIN_ROW_INDEX_STRIDE = 1000;
private final Path path;
- private long adjustedStripeSize;
+ private final long stripeSize;
private final int rowIndexStride;
private final TypeDescription schema;
private final PhysicalWriter physicalWriter;
@@ -107,6 +107,10 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
private final TreeWriter treeWriter;
private final boolean buildIndex;
private final MemoryManager memoryManager;
+ private long previousAllocation = -1;
+ private long memoryLimit;
+ private final long ROWS_PER_CHECK;
+ private long rowsSinceCheck = 0;
private final OrcFile.Version version;
private final Configuration conf;
private final OrcFile.WriterCallback callback;
@@ -178,7 +182,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
}
this.writeTimeZone = hasTimestamp(schema);
this.useUTCTimeZone = opts.getUseUTCTimestamp();
- this.adjustedStripeSize = opts.getStripeSize();
+ this.stripeSize = opts.getStripeSize();
this.version = opts.getVersion();
this.encodingStrategy = opts.getEncodingStrategy();
this.compressionStrategy = opts.getCompressionStrategy();
@@ -212,9 +216,11 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
MIN_ROW_INDEX_STRIDE);
}
// ensure that we are able to handle callbacks before we register ourselves
- memoryManager.addWriter(path, opts.getStripeSize(), this);
+ ROWS_PER_CHECK = OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf);
+ memoryLimit = stripeSize;
+ memoryManager.addWriter(path, stripeSize, this);
LOG.info("ORC writer created for path: {} with stripeSize: {} options: {}",
- path, adjustedStripeSize, unencryptedOptions);
+ path, stripeSize, unencryptedOptions);
}
//@VisibleForTesting
@@ -226,7 +232,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
// sizes.
int estBufferSize = (int) (stripeSize / (20L * numColumns));
estBufferSize = getClosestBufferSize(estBufferSize);
- return estBufferSize > bs ? bs : estBufferSize;
+ return Math.min(estBufferSize, bs);
}
@Override
@@ -286,15 +292,22 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
@Override
public boolean checkMemory(double newScale) throws IOException {
- long limit = Math.round(adjustedStripeSize * newScale);
- long size = treeWriter.estimateMemory();
- if (LOG.isDebugEnabled()) {
- LOG.debug("ORC writer " + physicalWriter + " size = " + size +
- " limit = " + limit);
- }
- if (size > limit) {
- flushStripe();
- return true;
+ memoryLimit = Math.round(stripeSize * newScale);
+ return checkMemory();
+ }
+
+ private boolean checkMemory() throws IOException {
+ if (rowsSinceCheck >= ROWS_PER_CHECK) {
+ rowsSinceCheck = 0;
+ long size = treeWriter.estimateMemory();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ORC writer " + physicalWriter + " size = " + size +
+ " limit = " + memoryLimit);
+ }
+ if (size > memoryLimit) {
+ flushStripe();
+ return true;
+ }
}
return false;
}
@@ -676,7 +689,9 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
rowsInStripe += batch.size;
treeWriter.writeRootBatch(batch, 0, batch.size);
}
- memoryManager.addedRow(batch.size);
+ rowsSinceCheck += batch.size;
+ previousAllocation = memoryManager.checkMemory(previousAllocation, this);
+ checkMemory();
} catch (Throwable t) {
try {
close();
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 6226719..a27d69a 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -20,6 +20,7 @@ package org.apache.orc;
import org.apache.orc.impl.InStream;
import org.apache.orc.impl.KeyProvider;
+import org.apache.orc.impl.MemoryManagerImpl;
import org.apache.orc.impl.OrcCodecPool;
import org.apache.orc.impl.WriterImpl;
@@ -2449,54 +2450,30 @@ public class TestVectorOrcFile {
new MiddleStruct(inner, inner2), list(), map(inner, inner2));
}
- private static class MyMemoryManager implements MemoryManager {
- double rate;
- Path path = null;
- int rows = 0;
- Callback callback;
-
- MyMemoryManager(Configuration conf, long totalSpace, double rate) {
- this.rate = rate;
- }
-
- @Override
- public void addWriter(Path path, long requestedAllocation,
- Callback callback) {
- this.path = path;
- this.callback = callback;
- }
-
- @Override
- public synchronized void removeWriter(Path path) {
- this.path = null;
- }
-
+ @Test
+ public void testMemoryManagement() throws Exception {
+ OrcConf.ROWS_BETWEEN_CHECKS.setLong(conf, 100);
+ final long POOL_SIZE = 50_000;
+ TypeDescription schema = createInnerSchema();
+ MemoryManagerImpl memoryMgr = new MemoryManagerImpl(POOL_SIZE);
- @Override
- public void addedRow(int count) throws IOException {
- rows += count;
- if (rows % 100 == 0) {
- callback.checkMemory(rate);
- }
+ // set up 10 files that all request the full size.
+ MemoryManager.Callback ignore = newScale -> false;
+ for(int f=0; f < 9; ++f) {
+ memoryMgr.addWriter(new Path("file-" + f), POOL_SIZE, ignore);
}
- }
- @Test
- public void testMemoryManagementV11() throws Exception {
- Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
-
- TypeDescription schema = createInnerSchema();
- MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
Writer writer = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.setSchema(schema)
.compress(CompressionKind.NONE)
- .stripeSize(50000)
+ .stripeSize(POOL_SIZE)
.bufferSize(100)
.rowIndexStride(0)
- .memory(memory)
+ .memory(memoryMgr)
.version(fileFormat));
- assertEquals(testFilePath, memory.path);
+ // check to make sure it is 10%
+ assertEquals(0.1, memoryMgr.getAllocationScale(), 0.001);
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1;
for(int i=0; i < 2500; ++i) {
@@ -2506,56 +2483,17 @@ public class TestVectorOrcFile {
writer.addRowBatch(batch);
}
writer.close();
- assertEquals(null, memory.path);
+ assertEquals(0.111, memoryMgr.getAllocationScale(), 0.001);
Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf).filesystem(fs));
int i = 0;
for(StripeInformation stripe: reader.getStripes()) {
i += 1;
assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(),
- stripe.getDataLength() < 5000);
- }
- assertEquals(25, i);
- assertEquals(2500, reader.getNumberOfRows());
- }
-
- @Test
- public void testMemoryManagementV12() throws Exception {
- Assume.assumeTrue(fileFormat != OrcFile.Version.V_0_11);
- TypeDescription schema = createInnerSchema();
- MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
- Writer writer = OrcFile.createWriter(testFilePath,
- OrcFile.writerOptions(conf)
- .setSchema(schema)
- .compress(CompressionKind.NONE)
- .stripeSize(50000)
- .bufferSize(100)
- .rowIndexStride(0)
- .memory(memory)
- .version(fileFormat));
- VectorizedRowBatch batch = schema.createRowBatch();
- assertEquals(testFilePath, memory.path);
- batch.size = 1;
- for(int i=0; i < 2500; ++i) {
- ((LongColumnVector) batch.cols[0]).vector[0] = i * 300;
- ((BytesColumnVector) batch.cols[1]).setVal(0,
- Integer.toHexString(10*i).getBytes(StandardCharsets.UTF_8));
- writer.addRowBatch(batch);
+ stripe.getDataLength() < POOL_SIZE);
}
- writer.close();
- assertEquals(null, memory.path);
- Reader reader = OrcFile.createReader(testFilePath,
- OrcFile.readerOptions(conf).filesystem(fs));
- int i = 0;
- for(StripeInformation stripe: reader.getStripes()) {
- i += 1;
- assertTrue(testFilePath + " stripe " + i + " is too long at " +
- stripe.getDataLength(), stripe.getDataLength() < 5000);
- }
- // with HIVE-7832, the dictionaries will be disabled after writing the first
- // stripe as there are too many distinct values. Hence only 3 stripes as
- // compared to 25 stripes in version 0.11 (above test case)
- assertEquals(3, i);
+ // 0.11 always uses the dictionary, so ends up with a lot more stripes
+ assertEquals(fileFormat == OrcFile.Version.V_0_11 ? 25 : 3, i);
assertEquals(2500, reader.getNumberOfRows());
}
diff --git a/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java b/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java
index 109b95e..dab59de 100644
--- a/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java
+++ b/java/core/src/test/org/apache/orc/impl/TestMemoryManager.java
@@ -79,18 +79,12 @@ public class TestMemoryManager {
Configuration conf = new Configuration();
conf.set("hive.exec.orc.memory.pool", "0.9");
MemoryManagerImpl mgr = new MemoryManagerImpl(conf);
- assertEquals("Wrong default ",
- OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf), mgr.ROWS_BETWEEN_CHECKS);
long mem =
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
System.err.print("Memory = " + mem);
long pool = mgr.getTotalMemoryPool();
assertTrue("Pool too small: " + pool, mem * 0.899 < pool);
assertTrue("Pool too big: " + pool, pool < mem * 0.901);
-
- conf.setLong(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), 1234);
- mgr = new MemoryManagerImpl(conf);
- assertEquals("Wrong default ", 1234, mgr.ROWS_BETWEEN_CHECKS);
}
private static class DoubleMatcher extends BaseMatcher<Double> {
@@ -128,12 +122,12 @@ public class TestMemoryManager {
calls[i] = Mockito.mock(MemoryManager.Callback.class);
mgr.addWriter(new Path(Integer.toString(i)), pool/4, calls[i]);
}
- // add enough rows to get the memory manager to check the limits
- for(int i=0; i < 10000; ++i) {
- mgr.addedRow(1);
+ // check to make sure that they get scaled down
+ for(int i=0; i < calls.length; ++i) {
+ mgr.checkMemory(0, calls[i]);
}
for(int call=0; call < calls.length; ++call) {
- Mockito.verify(calls[call], Mockito.times(2))
+ Mockito.verify(calls[call])
.checkMemory(Matchers.doubleThat(closeTo(0.2, ERROR)));
}
}