You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/24 23:10:35 UTC
svn commit: r1471680 - in /hive/branches/branch-0.11:
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
ql/src/test/org/apache/hadoop/hive/ql/io/orc/
Author: hashutosh
Date: Wed Apr 24 21:10:34 2013
New Revision: 1471680
URL: http://svn.apache.org/r1471680
Log:
HIVE-4248 : Implement a memory manager for ORC (Owen Omalley via Ashutosh Chauhan)
Added:
hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
Modified:
hive/branches/branch-0.11/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
Modified: hive/branches/branch-0.11/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.11/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1471680&r1=1471679&r2=1471680&view=diff
==============================================================================
--- hive/branches/branch-0.11/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/branch-0.11/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Apr 24 21:10:34 2013
@@ -486,6 +486,9 @@ public class HiveConf extends Configurat
HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true),
HIVEUSERCFILESYNCCACHE("hive.exec.rcfile.use.sync.cache", true),
+ // Maximum fraction of heap that can be used by ORC file writers
+ HIVE_ORC_FILE_MEMORY_POOL("hive.exec.orc.memory.pool", 0.5f), // 50%
+
HIVESKEWJOIN("hive.optimize.skewjoin", false),
HIVECONVERTJOIN("hive.auto.convert.join", true),
HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true),
Added: hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java?rev=1471680&view=auto
==============================================================================
--- hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java (added)
+++ hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java Wed Apr 24 21:10:34 2013
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements a memory manager that keeps a global context of how many ORC
+ * writers there are and manages the memory between them. For use cases with
+ * dynamic partitions, it is easy to end up with many writers in the same task.
+ * By managing the size of each allocation, we try to cut down the size of each
+ * allocation and keep the task from running out of memory.
+ */
+class MemoryManager {
+ /**
+ * How much does the pool need to change between notifications?
+ */
+ private static final double NOTIFICATION_FACTOR = 1.1;
+ private final long totalMemoryPool;
+ private long notificationTrigger;
+ private final Map<Path, WriterInfo> writerList =
+ new HashMap<Path, WriterInfo>();
+ private long totalAllocation = 0;
+ private double currentScale = 1;
+ private double lastNotificationScale = 1;
+
+ private static class WriterInfo {
+ long allocation;
+ Callback callback;
+ WriterInfo(long allocation, Callback callback) {
+ this.allocation = allocation;
+ this.callback = callback;
+ }
+ }
+
+ public interface Callback {
+ void checkMemory(double newScale) throws IOException;
+ }
+
+ /**
+ * Create the memory manager.
+ * @param conf use the configuration to find the maximum size of the memory
+ * pool.
+ */
+ MemoryManager(Configuration conf) {
+ HiveConf.ConfVars poolVar = HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL;
+ double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal);
+ totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
+ getHeapMemoryUsage().getMax() * maxLoad);
+ notificationTrigger = Math.round(totalMemoryPool * NOTIFICATION_FACTOR);
+ }
+
+ /**
+ * Add a new writer's memory allocation to the pool
+ * @param path the file that is being written
+ * @param requestedAllocation the requested buffer size
+ */
+ synchronized void addWriter(Path path, long requestedAllocation,
+ Callback callback) throws IOException {
+ WriterInfo oldVal = writerList.get(path);
+ if (oldVal == null) {
+ oldVal = new WriterInfo(requestedAllocation, callback);
+ writerList.put(path, oldVal);
+ totalAllocation += requestedAllocation;
+ } else {
+ totalAllocation += requestedAllocation - oldVal.allocation;
+ oldVal.allocation = requestedAllocation;
+ oldVal.callback = callback;
+ }
+ updateScale(true);
+ }
+
+ /**
+ * Remove the given writer from the pool.
+ * @param path the file that has been closed
+ */
+ synchronized void removeWriter(Path path) throws IOException {
+ WriterInfo val = writerList.get(path);
+ if (val != null) {
+ writerList.remove(path);
+ totalAllocation -= val.allocation;
+ updateScale(false);
+ }
+ }
+
+ /**
+ * Get the total pool size that is available for ORC writers.
+ * @return the number of bytes in the pool
+ */
+ long getTotalMemoryPool() {
+ return totalMemoryPool;
+ }
+
+ /**
+ * The scaling factor for each allocation to ensure that the pool isn't
+ * oversubscribed.
+ * @return a fraction between 0.0 and 1.0 of the requested size that is
+ * available for each writer.
+ */
+ synchronized double getAllocationScale() {
+ return currentScale;
+ }
+
+ /**
+ * Update the currentScale based on the current allocation and pool size.
+ * This also updates the notificationTrigger.
+ * @param isAllocate is this an allocation?
+ */
+ private void updateScale(boolean isAllocate) throws IOException {
+ if (totalAllocation <= totalMemoryPool) {
+ currentScale = 1;
+ } else {
+ currentScale = (double) totalMemoryPool / totalAllocation;
+ }
+ if (!isAllocate) {
+ // ensure that we notify if we drop 10% from the high water mark
+ notificationTrigger =
+ Math.min(notificationTrigger,
+ Math.round(totalMemoryPool * NOTIFICATION_FACTOR / currentScale));
+ } else {
+ // we've allocated a new writer, so check to see if we need to notify
+ if (totalAllocation > notificationTrigger) {
+ for(WriterInfo writer: writerList.values()) {
+ writer.callback.checkMemory(currentScale);
+ }
+ // set the next notification trigger
+ notificationTrigger =
+ Math.round(totalMemoryPool * NOTIFICATION_FACTOR / currentScale);
+ }
+ }
+ }
+}
Modified: hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1471680&r1=1471679&r2=1471680&view=diff
==============================================================================
--- hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Wed Apr 24 21:10:34 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -70,13 +71,23 @@ public final class OrcFile {
*/
public static Writer createWriter(FileSystem fs,
Path path,
+ Configuration conf,
ObjectInspector inspector,
long stripeSize,
CompressionKind compress,
int bufferSize,
int rowIndexStride) throws IOException {
return new WriterImpl(fs, path, inspector, stripeSize, compress,
- bufferSize, rowIndexStride);
+ bufferSize, rowIndexStride, getMemoryManager(conf));
}
+ private static MemoryManager memoryManager = null;
+
+ private static synchronized
+ MemoryManager getMemoryManager(Configuration conf) {
+ if (memoryManager == null) {
+ memoryManager = new MemoryManager(conf);
+ }
+ return memoryManager;
+ }
}
Modified: hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1471680&r1=1471679&r2=1471680&view=diff
==============================================================================
--- hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Wed Apr 24 21:10:34 2013
@@ -71,8 +71,8 @@ public class OrcOutputFormat extends Fil
public void write(NullWritable nullWritable,
OrcSerdeRow row) throws IOException {
if (writer == null) {
- writer = OrcFile.createWriter(fs, path, row.getInspector(), stripeSize,
- compress, compressionSize, rowIndexStride);
+ writer = OrcFile.createWriter(fs, path, this.conf, row.getInspector(),
+ stripeSize, compress, compressionSize, rowIndexStride);
}
writer.addRow(row.getRow());
}
@@ -81,8 +81,9 @@ public class OrcOutputFormat extends Fil
public void write(Writable row) throws IOException {
OrcSerdeRow serdeRow = (OrcSerdeRow) row;
if (writer == null) {
- writer = OrcFile.createWriter(fs, path, serdeRow.getInspector(),
- stripeSize, compress, compressionSize, rowIndexStride);
+ writer = OrcFile.createWriter(fs, path, this.conf,
+ serdeRow.getInspector(), stripeSize, compress, compressionSize,
+ rowIndexStride);
}
writer.addRow(serdeRow.getRow());
}
@@ -101,8 +102,8 @@ public class OrcOutputFormat extends Fil
ObjectInspector inspector = ObjectInspectorFactory.
getStandardStructObjectInspector(new ArrayList<String>(),
new ArrayList<ObjectInspector>());
- writer = OrcFile.createWriter(fs, path, inspector, stripeSize,
- compress, compressionSize, rowIndexStride);
+ writer = OrcFile.createWriter(fs, path, this.conf, inspector,
+ stripeSize, compress, compressionSize, rowIndexStride);
}
writer.close();
}
Modified: hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1471680&r1=1471679&r2=1471680&view=diff
==============================================================================
--- hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Wed Apr 24 21:10:34 2013
@@ -63,7 +63,7 @@ import com.google.protobuf.CodedOutputSt
* sub-types. Each of the TreeWriters writes the column's data as a set of
* streams.
*/
-class WriterImpl implements Writer {
+class WriterImpl implements Writer, MemoryManager.Callback {
private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private static final int MIN_ROW_INDEX_STRIDE = 1000;
@@ -98,6 +98,7 @@ class WriterImpl implements Writer {
private final OrcProto.RowIndex.Builder rowIndex =
OrcProto.RowIndex.newBuilder();
private final boolean buildIndex;
+ private final MemoryManager memoryManager;
WriterImpl(FileSystem fs,
Path path,
@@ -105,13 +106,15 @@ class WriterImpl implements Writer {
long stripeSize,
CompressionKind compress,
int bufferSize,
- int rowIndexStride) throws IOException {
+ int rowIndexStride,
+ MemoryManager memoryManager) throws IOException {
this.fs = fs;
this.path = path;
this.stripeSize = stripeSize;
this.compress = compress;
this.bufferSize = bufferSize;
this.rowIndexStride = rowIndexStride;
+ this.memoryManager = memoryManager;
buildIndex = rowIndexStride > 0;
codec = createCodec(compress);
treeWriter = createTreeWriter(inspector, streamFactory, false);
@@ -119,6 +122,8 @@ class WriterImpl implements Writer {
throw new IllegalArgumentException("Row stride must be at least " +
MIN_ROW_INDEX_STRIDE);
}
+ // ensure that we are able to handle callbacks before we register ourselves
+ memoryManager.addWriter(path, stripeSize, this);
}
static CompressionCodec createCodec(CompressionKind kind) {
@@ -148,6 +153,13 @@ class WriterImpl implements Writer {
}
}
+ @Override
+ public void checkMemory(double newScale) throws IOException {
+ if (estimateStripeSize() > Math.round(stripeSize * newScale)) {
+ flushStripe();
+ }
+ }
+
/**
* This class is used to hold the contents of streams as they are buffered.
* The TreeWriters write to the outStream and the codec compresses the
@@ -1445,8 +1457,8 @@ class WriterImpl implements Writer {
}
}
// once every 1000 rows, check the size to see if we should spill
- if (rowsInStripe % 1000 == 0 && estimateStripeSize() > stripeSize) {
- flushStripe();
+ if (rowsInStripe % 1000 == 0) {
+ checkMemory(memoryManager.getAllocationScale());
}
}
@@ -1456,5 +1468,6 @@ class WriterImpl implements Writer {
int footerLength = writeFooter(rawWriter.getPos());
rawWriter.writeByte(writePostScript(footerLength));
rawWriter.close();
+ memoryManager.removeWriter(path);
}
}
Modified: hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java?rev=1471680&r1=1471679&r2=1471680&view=diff
==============================================================================
--- hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java (original)
+++ hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java Wed Apr 24 21:10:34 2013
@@ -94,7 +94,7 @@ public class TestFileDump {
inspector = ObjectInspectorFactory.getReflectionObjectInspector
(MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
- Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
100000, CompressionKind.ZLIB, 10000, 10000);
Random r1 = new Random(1);
String[] words = new String[]{"It", "was", "the", "best", "of", "times,",
Added: hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java?rev=1471680&view=auto
==============================================================================
--- hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java (added)
+++ hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java Wed Apr 24 21:10:34 2013
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Test;
+
+import java.lang.management.ManagementFactory;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doubleThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test the ORC memory manager.
+ */
+public class TestMemoryManager {
+ private static final double ERROR = 0.000001;
+
+ private static class NullCallback implements MemoryManager.Callback {
+ public void checkMemory(double newScale) {
+ // PASS
+ }
+ }
+
+ @Test
+ public void testBasics() throws Exception {
+ Configuration conf = new Configuration();
+ MemoryManager mgr = new MemoryManager(conf);
+ NullCallback callback = new NullCallback();
+ long poolSize = mgr.getTotalMemoryPool();
+ assertEquals(Math.round(ManagementFactory.getMemoryMXBean().
+ getHeapMemoryUsage().getMax() * 0.5f), poolSize);
+ assertEquals(1.0, mgr.getAllocationScale(), 0.00001);
+ mgr.addWriter(new Path("p1"), 1000, callback);
+ assertEquals(1.0, mgr.getAllocationScale(), 0.00001);
+ mgr.addWriter(new Path("p1"), poolSize / 2, callback);
+ assertEquals(1.0, mgr.getAllocationScale(), 0.00001);
+ mgr.addWriter(new Path("p2"), poolSize / 2, callback);
+ assertEquals(1.0, mgr.getAllocationScale(), 0.00001);
+ mgr.addWriter(new Path("p3"), poolSize / 2, callback);
+ assertEquals(0.6666667, mgr.getAllocationScale(), 0.00001);
+ mgr.addWriter(new Path("p4"), poolSize / 2, callback);
+ assertEquals(0.5, mgr.getAllocationScale(), 0.000001);
+ mgr.addWriter(new Path("p4"), 3 * poolSize / 2, callback);
+ assertEquals(0.3333333, mgr.getAllocationScale(), 0.000001);
+ mgr.removeWriter(new Path("p1"));
+ mgr.removeWriter(new Path("p2"));
+ assertEquals(0.5, mgr.getAllocationScale(), 0.00001);
+ mgr.removeWriter(new Path("p4"));
+ assertEquals(1.0, mgr.getAllocationScale(), 0.00001);
+ }
+
+ @Test
+ public void testConfig() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("hive.exec.orc.memory.pool", "0.9");
+ MemoryManager mgr = new MemoryManager(conf);
+ long mem =
+ ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
+ System.err.print("Memory = " + mem);
+ long pool = mgr.getTotalMemoryPool();
+ assertTrue("Pool too small: " + pool, mem * 0.899 < pool);
+ assertTrue("Pool too big: " + pool, pool < mem * 0.901);
+ }
+
+ private static class DoubleMatcher extends BaseMatcher<Double> {
+ final double expected;
+ final double error;
+ DoubleMatcher(double expected, double error) {
+ this.expected = expected;
+ this.error = error;
+ }
+
+ @Override
+ public boolean matches(Object val) {
+ double dbl = (Double) val;
+ return Math.abs(dbl - expected) <= error;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("not sufficiently close to ");
+ description.appendText(Double.toString(expected));
+ }
+ }
+
+ private static DoubleMatcher closeTo(double value, double error) {
+ return new DoubleMatcher(value, error);
+ }
+
+ @Test
+ public void testCallback() throws Exception {
+ Configuration conf = new Configuration();
+ MemoryManager mgr = new MemoryManager(conf);
+ long pool = mgr.getTotalMemoryPool();
+ MemoryManager.Callback[] calls = new MemoryManager.Callback[20];
+ for(int i=0; i < calls.length; ++i) {
+ calls[i] = mock(MemoryManager.Callback.class);
+ mgr.addWriter(new Path(Integer.toString(i)), pool/4, calls[i]);
+ }
+ double[] spills = new double[]{0, 0, 0, 0, 0.8, 0.666666666667,
+ 0.571428571429, 0.5, 0.444444444444,
+ 0.4, 0, 0.333333333333, 0, 0.285714285714,
+ 0, 0.25, 0, 0.222222222222, 0, 0.2};
+ for(int spill=0; spill < spills.length; ++spill) {
+ if (spills[spill] != 0) {
+ for(int call=0; call < spill + 1; ++call) {
+ verify(calls[call], times(1))
+ .checkMemory(doubleThat(closeTo(spills[spill], ERROR)));
+ }
+ }
+ }
+ }
+}
Modified: hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java?rev=1471680&r1=1471679&r2=1471680&view=diff
==============================================================================
--- hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java (original)
+++ hive/branches/branch-0.11/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java Wed Apr 24 21:10:34 2013
@@ -189,7 +189,7 @@ public class TestOrcFile {
inspector = ObjectInspectorFactory.getReflectionObjectInspector
(BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
- Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
100000, CompressionKind.ZLIB, 10000, 10000);
writer.addRow(new BigRow(false, (byte) 1, (short) 1024, 65536,
Long.MAX_VALUE, (float) 1.0, -15.0, bytes(0,1,2,3,4), "hi",
@@ -421,7 +421,7 @@ public class TestOrcFile {
(InnerStruct.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
- Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
1000, CompressionKind.NONE, 100, 1000);
Random r1 = new Random(1);
Random r2 = new Random(2);
@@ -504,7 +504,7 @@ public class TestOrcFile {
inspector = ObjectInspectorFactory.getReflectionObjectInspector
(BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
- Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
1000, CompressionKind.NONE, 100, 10000);
writer.close();
Reader reader = OrcFile.createReader(fs, testFilePath);
@@ -524,7 +524,7 @@ public class TestOrcFile {
inspector = ObjectInspectorFactory.getReflectionObjectInspector
(BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
- Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
1000, CompressionKind.NONE, 100, 10000);
writer.addUserMetadata("my.meta", byteBuf(1, 2, 3, 4, 5, 6, 7, -1, -2, 127, -128));
writer.addUserMetadata("clobber", byteBuf(1,2,3));
@@ -590,7 +590,7 @@ public class TestOrcFile {
inspector = OrcStruct.createObjectInspector(0, types);
}
HiveDecimal maxValue = new HiveDecimal("100000000000000000000");
- Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
1000, CompressionKind.NONE, 100, 10000);
OrcStruct row = new OrcStruct(3);
OrcUnion union = new OrcUnion();
@@ -767,7 +767,7 @@ public class TestOrcFile {
(InnerStruct.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
- Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
1000, CompressionKind.SNAPPY, 100, 10000);
Random rand = new Random(12);
for(int i=0; i < 10000; ++i) {
@@ -802,7 +802,7 @@ public class TestOrcFile {
(InnerStruct.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
- Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
5000, CompressionKind.SNAPPY, 1000, 0);
Random rand = new Random(24);
for(int i=0; i < 10000; ++i) {
@@ -843,7 +843,7 @@ public class TestOrcFile {
inspector = ObjectInspectorFactory.getReflectionObjectInspector
(BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
- Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
200000, CompressionKind.ZLIB, 65536, 1000);
Random rand = new Random(42);
final int COUNT=32768;
@@ -936,4 +936,68 @@ public class TestOrcFile {
(float) doubleValues[i], doubleValues[i], byteValues[i],stringValues[i],
new MiddleStruct(inner, inner2), list(), map(inner,inner2));
}
+
+ private static class MyMemoryManager extends MemoryManager {
+ final long totalSpace;
+ double rate;
+ Path path = null;
+ long lastAllocation = 0;
+
+ MyMemoryManager(Configuration conf, long totalSpace, double rate) {
+ super(conf);
+ this.totalSpace = totalSpace;
+ this.rate = rate;
+ }
+
+ @Override
+ void addWriter(Path path, long requestedAllocation,
+ MemoryManager.Callback callback) {
+ this.path = path;
+ this.lastAllocation = requestedAllocation;
+ }
+
+ @Override
+ synchronized void removeWriter(Path path) {
+ this.path = null;
+ this.lastAllocation = 0;
+ }
+
+ @Override
+ long getTotalMemoryPool() {
+ return totalSpace;
+ }
+
+ @Override
+ double getAllocationScale() {
+ return rate;
+ }
+ }
+
+ @Test
+ public void testMemoryManagement() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (InnerStruct.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+ MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
+ Writer writer = new WriterImpl(fs, testFilePath, inspector,
+ 50000, CompressionKind.NONE, 100, 0, memory);
+ assertEquals(testFilePath, memory.path);
+ for(int i=0; i < 2500; ++i) {
+ writer.addRow(new InnerStruct(i*300, Integer.toHexString(10*i)));
+ }
+ writer.close();
+ assertEquals(null, memory.path);
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ int i = 0;
+ for(StripeInformation stripe: reader.getStripes()) {
+ i += 1;
+ assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(),
+ stripe.getDataLength() < 10000);
+ }
+ assertEquals(3, i);
+ assertEquals(2500, reader.getNumberOfRows());
+ }
}