You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/08/16 02:50:05 UTC

[arrow] branch master updated: ARROW-6199: [Java] Avro adapter avoid potential resource leak.

This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new dd4532a  ARROW-6199: [Java] Avro adapter avoid potential resource leak.
dd4532a is described below

commit dd4532a0cdaccf8e7811086bc5360b13ef9a6c36
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Thu Aug 15 19:49:53 2019 -0700

    ARROW-6199: [Java] Avro adapter avoid potential resource leak.
    
    Related to [ARROW-6199](https://issues.apache.org/jira/browse/ARROW-6199).
    
    Currently, avro consumer interface has no close API, which may cause resource leak like AvroBytesConsumer#cacheBuffer.
    To resolve this, make consumer extends AutoCloseable and create CompositeAvroConsumer to encompasses consume and close logic.
    
    Closes #5059 from tianchen92/ARROW-6199 and squashes the following commits:
    
    d60d94c48 <tianchen> fix
    42f22da7c <tianchen> clear vectors in close
    5b91da75f <tianchen> fix comments
    3ffc07600 <tianchen> ARROW-6199:  Avro adapter avoid potential resource leak.
    
    Authored-by: tianchen <ni...@alibaba-inc.com>
    Signed-off-by: Micah Kornfield <em...@gmail.com>
---
 .../java/org/apache/arrow/AvroToArrowUtils.java    | 22 +++----
 .../arrow/consumers/AvroBooleanConsumer.java       |  5 ++
 .../apache/arrow/consumers/AvroBytesConsumer.java  |  5 ++
 .../apache/arrow/consumers/AvroDoubleConsumer.java |  5 ++
 .../apache/arrow/consumers/AvroFloatConsumer.java  |  5 ++
 .../apache/arrow/consumers/AvroIntConsumer.java    |  5 ++
 .../apache/arrow/consumers/AvroLongConsumer.java   |  5 ++
 .../apache/arrow/consumers/AvroNullConsumer.java   |  5 ++
 .../apache/arrow/consumers/AvroStringConsumer.java |  5 ++
 .../apache/arrow/consumers/AvroUnionsConsumer.java | 16 +++--
 .../arrow/consumers/CompositeAvroConsumer.java     | 69 ++++++++++++++++++++++
 .../java/org/apache/arrow/consumers/Consumer.java  |  7 ++-
 .../arrow/consumers/NullableTypeConsumer.java      |  5 ++
 13 files changed, 141 insertions(+), 18 deletions(-)

diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
index 25611a5..77f34df 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
@@ -20,7 +20,6 @@ package org.apache.arrow;
 import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
 import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -37,6 +36,7 @@ import org.apache.arrow.consumers.AvroLongConsumer;
 import org.apache.arrow.consumers.AvroNullConsumer;
 import org.apache.arrow.consumers.AvroStringConsumer;
 import org.apache.arrow.consumers.AvroUnionsConsumer;
+import org.apache.arrow.consumers.CompositeAvroConsumer;
 import org.apache.arrow.consumers.Consumer;
 import org.apache.arrow.consumers.NullableTypeConsumer;
 import org.apache.arrow.memory.BufferAllocator;
@@ -246,19 +246,15 @@ public class AvroToArrowUtils {
 
     VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
 
-    int valueCount = 0;
-    while (true) {
-      try {
-        for (Consumer consumer : consumers) {
-          consumer.consume(decoder);
-        }
-        valueCount++;
-        //reach end will throw EOFException.
-      } catch (EOFException eofException) {
-        root.setRowCount(valueCount);
-        break;
-      }
+    CompositeAvroConsumer compositeConsumer = null;
+    try {
+      compositeConsumer = new CompositeAvroConsumer(consumers);
+      compositeConsumer.consume(decoder, root);
+    } catch (Exception e) {
+      compositeConsumer.close();
+      throw new RuntimeException("Error occurs while consume process.", e);
     }
+
     return root;
   }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
index b2fe704..c2876f1 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
@@ -63,4 +63,9 @@ public class AvroBooleanConsumer implements Consumer {
     return this.vector;
   }
 
+  @Override
+  public void close() throws Exception {
+    writer.close();
+  }
+
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
index 2c649f9..c0cfaec 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
@@ -79,4 +79,9 @@ public class AvroBytesConsumer implements Consumer {
   public FieldVector getVector() {
     return vector;
   }
+
+  @Override
+  public void close() throws Exception {
+    writer.close();
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
index 63b2071..6538831 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
@@ -62,4 +62,9 @@ public class AvroDoubleConsumer implements Consumer {
   public FieldVector getVector() {
     return vector;
   }
+
+  @Override
+  public void close() throws Exception {
+    writer.close();
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
index ea752e2..6256a9a 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
@@ -62,4 +62,9 @@ public class AvroFloatConsumer implements Consumer {
   public FieldVector getVector() {
     return this.vector;
   }
+
+  @Override
+  public void close() throws Exception {
+    writer.close();
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
index ab830bc..854c8d0 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
@@ -62,4 +62,9 @@ public class AvroIntConsumer implements Consumer {
   public FieldVector getVector() {
     return this.vector;
   }
+
+  @Override
+  public void close() throws Exception {
+    writer.close();
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
index 68acb94..e0095cc 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
@@ -62,4 +62,9 @@ public class AvroLongConsumer implements Consumer {
   public FieldVector getVector() {
     return this.vector;
   }
+
+  @Override
+  public void close() throws Exception {
+    writer.close();
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
index d06e2f5..1e32419 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
@@ -48,4 +48,9 @@ public class AvroNullConsumer implements Consumer {
   public FieldVector getVector() {
     return this.vector;
   }
+
+  @Override
+  public void close() {
+    vector.close();
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
index 1719bf7..850d699 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
@@ -80,4 +80,9 @@ public class AvroStringConsumer implements Consumer {
   public FieldVector getVector() {
     return this.vector;
   }
+
+  @Override
+  public void close() throws Exception {
+    writer.close();
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
index 5277678..b927a5b 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
@@ -31,7 +31,7 @@ import org.apache.avro.io.Decoder;
  */
 public class AvroUnionsConsumer implements Consumer {
 
-  private Consumer[] indexDelegates;
+  private Consumer[] delegates;
   private Types.MinorType[] types;
 
   private UnionWriter writer;
@@ -40,11 +40,11 @@ public class AvroUnionsConsumer implements Consumer {
   /**
    * Instantiate a AvroUnionConsumer.
    */
-  public AvroUnionsConsumer(UnionVector vector, Consumer[] indexDelegates, Types.MinorType[] types) {
+  public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) {
 
     this.writer = new UnionWriter(vector);
     this.vector = vector;
-    this.indexDelegates = indexDelegates;
+    this.delegates = delegates;
     this.types = types;
   }
 
@@ -53,7 +53,7 @@ public class AvroUnionsConsumer implements Consumer {
     int fieldIndex = decoder.readInt();
     int position = writer.getPosition();
 
-    Consumer delegate = indexDelegates[fieldIndex];
+    Consumer delegate = delegates[fieldIndex];
 
     vector.setType(position, types[fieldIndex]);
     // In UnionVector we need to set sub vector writer position before consume a value
@@ -80,4 +80,12 @@ public class AvroUnionsConsumer implements Consumer {
     vector.setValueCount(writer.getPosition());
     return this.vector;
   }
+
+  @Override
+  public void close() throws Exception {
+    writer.close();
+    for (Consumer delegate: delegates) {
+      delegate.close();
+    }
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java
new file mode 100644
index 0000000..0f4b3c3
--- /dev/null
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Composite consumer which hold all consumers.
+ * It manages the consume and cleanup process.
+ */
+public class CompositeAvroConsumer implements AutoCloseable {
+
+  private final List<Consumer> consumers;
+
+  public CompositeAvroConsumer(List<Consumer> consumers) {
+    this.consumers = consumers;
+  }
+
+  /**
+   * Consume decoder data and write into {@link VectorSchemaRoot}.
+   */
+  public void consume(Decoder decoder, VectorSchemaRoot root) throws IOException {
+    int valueCount = 0;
+    while (true) {
+      try {
+        for (Consumer consumer : consumers) {
+          consumer.consume(decoder);
+        }
+        valueCount++;
+        //reach end will throw EOFException.
+      } catch (EOFException eofException) {
+        root.setRowCount(valueCount);
+        break;
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    // clean up
+    for (Consumer consumer : consumers) {
+      try {
+        consumer.close();
+      } catch (Exception e) {
+        throw new RuntimeException("Error occurs in close.", e);
+      }
+    }
+  }
+}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
index c3a543c..be318f6 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
@@ -25,7 +25,7 @@ import org.apache.avro.io.Decoder;
 /**
  * Interface that is used to consume values from avro decoder.
  */
-public interface Consumer {
+public interface Consumer extends AutoCloseable {
 
   /**
    * Consume a specific type value from avro decoder and write it to vector.
@@ -48,4 +48,9 @@ public interface Consumer {
    * Get the vector within the consumer.
    */
   FieldVector getVector();
+
+  /**
+   * Close this consumer when occurs exception to avoid potential leak.
+   */
+  void close() throws Exception;
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
index 5ac7bd7..05216b1 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
@@ -65,4 +65,9 @@ public class NullableTypeConsumer implements Consumer {
     return delegate.getVector();
   }
 
+  @Override
+  public void close() throws Exception {
+    delegate.close();
+  }
+
 }