You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2013/06/17 06:41:57 UTC
svn commit: r1493638 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec: PTFOperator.java
PTFPartition.java PTFPersistence.java
Author: navis
Date: Mon Jun 17 04:41:57 2013
New Revision: 1493638
URL: http://svn.apache.org/r1493638
Log:
HIVE-4708 : Temporary files are not closed in PTFPersistence on jvm reuse (Navis reviewed by Harish Butani)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1493638&r1=1493637&r2=1493638&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Mon Jun 17 04:41:57 2013
@@ -107,6 +107,7 @@ public class PTFOperator extends Operato
processInputPartition();
}
}
+ inputPart.close();
}
@Override
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java?rev=1493638&r1=1493637&r2=1493638&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java Mon Jun 17 04:41:57 2013
@@ -152,6 +152,10 @@ public class PTFPartition
return new PItr(start, end);
}
+ public void close() {
+ elems.close();
+ }
+
class PItr implements PTFPartitionIterator<Object>
{
int idx;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java?rev=1493638&r1=1493637&r2=1493638&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java Mon Jun 17 04:41:57 2013
@@ -37,6 +37,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -287,6 +289,11 @@ public class PTFPersistence {
bldr.append("]\n");
}
+ public void close() {
+ bytes = null;
+ offsetsArray = null;
+ }
+
class WIterator implements Iterator<Writable>
{
Writable wObj;
@@ -403,6 +410,12 @@ public class PTFPersistence {
public static class PartitionedByteBasedList extends ByteBasedList
{
+ private static final ShutdownHook hook = new ShutdownHook();
+
+ static {
+ Runtime.getRuntime().addShutdownHook(hook);
+ }
+
ArrayList<ByteBasedList> partitions;
ArrayList<Integer> partitionOffsets;
ArrayList<File> reusableFiles;
@@ -413,8 +426,7 @@ public class PTFPersistence {
{
this.batchSize = batchSize;
currentSize = 0;
- dir = PartitionedByteBasedList.createTempDir();
- Runtime.getRuntime().addShutdownHook(new ShutdownHook(dir));
+ hook.register(dir = PartitionedByteBasedList.createTempDir());
partitions = new ArrayList<ByteBasedList>();
partitionOffsets = new ArrayList<Integer>();
@@ -446,6 +458,22 @@ public class PTFPersistence {
}
}
+ @Override
+ public void close() {
+ super.close();
+ reusableFiles.clear();
+ partitionOffsets.clear();
+ for (ByteBasedList partition : partitions) {
+ partition.close();
+ }
+ partitions.clear();
+ try {
+ PartitionedByteBasedList.deleteRecursively(dir);
+ } catch (Exception e) {
+ }
+ hook.unregister(dir);
+ }
+
private void addPartition() throws HiveException
{
try
@@ -636,11 +664,14 @@ public class PTFPersistence {
static class ShutdownHook extends Thread
{
- File dir;
+ private final Set<File> dirs = new LinkedHashSet<File>();
- public ShutdownHook(File dir)
- {
- this.dir = dir;
+ public void register(File dir) {
+ dirs.add(dir);
+ }
+
+ public void unregister(File dir) {
+ dirs.remove(dir);
}
@Override
@@ -648,9 +679,11 @@ public class PTFPersistence {
{
try
{
- PartitionedByteBasedList.deleteRecursively(dir);
+ for (File dir : dirs) {
+ PartitionedByteBasedList.deleteRecursively(dir);
+ }
}
- catch(IOException ie)
+ catch(Exception ie)
{
}
}
@@ -852,6 +885,21 @@ public class PTFPersistence {
throw new HiveException("Reset on PersistentByteBasedList not supported");
}
+ @Override
+ public void close() {
+ super.close();
+ ByteBasedList list = memList.get();
+ if (list != null) {
+ list.close();
+ }
+ memList.clear();
+ try {
+ PartitionedByteBasedList.deleteRecursively(file);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
private ByteBasedList getList() throws HiveException
{
PTFPersistence.lock(lock.readLock());