You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2013/06/17 06:41:57 UTC

svn commit: r1493638 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec: PTFOperator.java PTFPartition.java PTFPersistence.java

Author: navis
Date: Mon Jun 17 04:41:57 2013
New Revision: 1493638

URL: http://svn.apache.org/r1493638
Log:
HIVE-4708 : Temporary files are not closed in PTFPersistence on jvm reuse (Navis reviewed by Harish Butani)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1493638&r1=1493637&r2=1493638&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Mon Jun 17 04:41:57 2013
@@ -107,6 +107,7 @@ public class PTFOperator extends Operato
         processInputPartition();
       }
     }
+    inputPart.close();
 	}
 
 	@Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java?rev=1493638&r1=1493637&r2=1493638&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java Mon Jun 17 04:41:57 2013
@@ -152,6 +152,10 @@ public class PTFPartition
     return new PItr(start, end);
   }
 
+  public void close() {
+    elems.close();
+  }
+
   class PItr implements PTFPartitionIterator<Object>
   {
     int idx;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java?rev=1493638&r1=1493637&r2=1493638&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java Mon Jun 17 04:41:57 2013
@@ -37,6 +37,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -287,6 +289,11 @@ public class PTFPersistence {
       bldr.append("]\n");
     }
 
+    public void close() {
+      bytes = null;
+      offsetsArray = null;
+    }
+
     class WIterator implements Iterator<Writable>
     {
       Writable wObj;
@@ -403,6 +410,12 @@ public class PTFPersistence {
 
   public static class PartitionedByteBasedList extends ByteBasedList
   {
+    private static final ShutdownHook hook = new ShutdownHook();
+
+    static {
+      Runtime.getRuntime().addShutdownHook(hook);
+    }
+
     ArrayList<ByteBasedList> partitions;
     ArrayList<Integer> partitionOffsets;
     ArrayList<File> reusableFiles;
@@ -413,8 +426,7 @@ public class PTFPersistence {
     {
       this.batchSize = batchSize;
       currentSize = 0;
-      dir = PartitionedByteBasedList.createTempDir();
-      Runtime.getRuntime().addShutdownHook(new ShutdownHook(dir));
+      hook.register(dir = PartitionedByteBasedList.createTempDir());
 
       partitions = new ArrayList<ByteBasedList>();
       partitionOffsets = new ArrayList<Integer>();
@@ -446,6 +458,22 @@ public class PTFPersistence {
       }
     }
 
+    @Override
+    public void close() {
+      super.close();
+      reusableFiles.clear();
+      partitionOffsets.clear();
+      for (ByteBasedList partition : partitions) {
+        partition.close();
+      }
+      partitions.clear();
+      try {
+        PartitionedByteBasedList.deleteRecursively(dir);
+      } catch (Exception e) {
+      }
+      hook.unregister(dir);
+    }
+
     private void addPartition() throws HiveException
     {
       try
@@ -636,11 +664,14 @@ public class PTFPersistence {
 
     static class ShutdownHook extends Thread
     {
-      File dir;
+      private final Set<File> dirs = new LinkedHashSet<File>();
 
-      public ShutdownHook(File dir)
-      {
-        this.dir = dir;
+      public void register(File dir) {
+        dirs.add(dir);
+      }
+
+      public void unregister(File dir) {
+        dirs.remove(dir);
       }
 
       @Override
@@ -648,9 +679,11 @@ public class PTFPersistence {
       {
         try
         {
-          PartitionedByteBasedList.deleteRecursively(dir);
+          for (File dir : dirs) {
+            PartitionedByteBasedList.deleteRecursively(dir);
+          }
         }
-        catch(IOException ie)
+        catch(Exception ie)
         {
         }
       }
@@ -852,6 +885,21 @@ public class PTFPersistence {
       throw new HiveException("Reset on PersistentByteBasedList not supported");
     }
 
+    @Override
+    public void close() {
+      super.close();
+      ByteBasedList list = memList.get();
+      if (list != null) {
+        list.close();
+      }
+      memList.clear();
+      try {
+        PartitionedByteBasedList.deleteRecursively(file);
+      } catch (Exception e) {
+        // ignore
+      }
+    }
+
     private ByteBasedList getList() throws HiveException
     {
       PTFPersistence.lock(lock.readLock());