You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/08/16 02:50:05 UTC
[arrow] branch master updated: ARROW-6199: [Java] Avro adapter
avoid potential resource leak.
This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new dd4532a ARROW-6199: [Java] Avro adapter avoid potential resource leak.
dd4532a is described below
commit dd4532a0cdaccf8e7811086bc5360b13ef9a6c36
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Thu Aug 15 19:49:53 2019 -0700
ARROW-6199: [Java] Avro adapter avoid potential resource leak.
Related to [ARROW-6199](https://issues.apache.org/jira/browse/ARROW-6199).
Currently, avro consumer interface has no close API, which may cause resource leak like AvroBytesConsumer#cacheBuffer.
To resolve this, make consumer extends AutoCloseable and create CompositeAvroConsumer to encompasses consume and close logic.
Closes #5059 from tianchen92/ARROW-6199 and squashes the following commits:
d60d94c48 <tianchen> fix
42f22da7c <tianchen> clear vectors in close
5b91da75f <tianchen> fix comments
3ffc07600 <tianchen> ARROW-6199: Avro adapter avoid potential resource leak.
Authored-by: tianchen <ni...@alibaba-inc.com>
Signed-off-by: Micah Kornfield <em...@gmail.com>
---
.../java/org/apache/arrow/AvroToArrowUtils.java | 22 +++----
.../arrow/consumers/AvroBooleanConsumer.java | 5 ++
.../apache/arrow/consumers/AvroBytesConsumer.java | 5 ++
.../apache/arrow/consumers/AvroDoubleConsumer.java | 5 ++
.../apache/arrow/consumers/AvroFloatConsumer.java | 5 ++
.../apache/arrow/consumers/AvroIntConsumer.java | 5 ++
.../apache/arrow/consumers/AvroLongConsumer.java | 5 ++
.../apache/arrow/consumers/AvroNullConsumer.java | 5 ++
.../apache/arrow/consumers/AvroStringConsumer.java | 5 ++
.../apache/arrow/consumers/AvroUnionsConsumer.java | 16 +++--
.../arrow/consumers/CompositeAvroConsumer.java | 69 ++++++++++++++++++++++
.../java/org/apache/arrow/consumers/Consumer.java | 7 ++-
.../arrow/consumers/NullableTypeConsumer.java | 5 ++
13 files changed, 141 insertions(+), 18 deletions(-)
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
index 25611a5..77f34df 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
@@ -20,7 +20,6 @@ package org.apache.arrow;
import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
-import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -37,6 +36,7 @@ import org.apache.arrow.consumers.AvroLongConsumer;
import org.apache.arrow.consumers.AvroNullConsumer;
import org.apache.arrow.consumers.AvroStringConsumer;
import org.apache.arrow.consumers.AvroUnionsConsumer;
+import org.apache.arrow.consumers.CompositeAvroConsumer;
import org.apache.arrow.consumers.Consumer;
import org.apache.arrow.consumers.NullableTypeConsumer;
import org.apache.arrow.memory.BufferAllocator;
@@ -246,19 +246,15 @@ public class AvroToArrowUtils {
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
- int valueCount = 0;
- while (true) {
- try {
- for (Consumer consumer : consumers) {
- consumer.consume(decoder);
- }
- valueCount++;
- //reach end will throw EOFException.
- } catch (EOFException eofException) {
- root.setRowCount(valueCount);
- break;
- }
+ CompositeAvroConsumer compositeConsumer = null;
+ try {
+ compositeConsumer = new CompositeAvroConsumer(consumers);
+ compositeConsumer.consume(decoder, root);
+ } catch (Exception e) {
+ compositeConsumer.close();
+ throw new RuntimeException("Error occurs while consume process.", e);
}
+
return root;
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
index b2fe704..c2876f1 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
@@ -63,4 +63,9 @@ public class AvroBooleanConsumer implements Consumer {
return this.vector;
}
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ }
+
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
index 2c649f9..c0cfaec 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
@@ -79,4 +79,9 @@ public class AvroBytesConsumer implements Consumer {
public FieldVector getVector() {
return vector;
}
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
index 63b2071..6538831 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
@@ -62,4 +62,9 @@ public class AvroDoubleConsumer implements Consumer {
public FieldVector getVector() {
return vector;
}
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
index ea752e2..6256a9a 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
@@ -62,4 +62,9 @@ public class AvroFloatConsumer implements Consumer {
public FieldVector getVector() {
return this.vector;
}
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
index ab830bc..854c8d0 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
@@ -62,4 +62,9 @@ public class AvroIntConsumer implements Consumer {
public FieldVector getVector() {
return this.vector;
}
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
index 68acb94..e0095cc 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
@@ -62,4 +62,9 @@ public class AvroLongConsumer implements Consumer {
public FieldVector getVector() {
return this.vector;
}
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
index d06e2f5..1e32419 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
@@ -48,4 +48,9 @@ public class AvroNullConsumer implements Consumer {
public FieldVector getVector() {
return this.vector;
}
+
+ @Override
+ public void close() {
+ vector.close();
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
index 1719bf7..850d699 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
@@ -80,4 +80,9 @@ public class AvroStringConsumer implements Consumer {
public FieldVector getVector() {
return this.vector;
}
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
index 5277678..b927a5b 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
@@ -31,7 +31,7 @@ import org.apache.avro.io.Decoder;
*/
public class AvroUnionsConsumer implements Consumer {
- private Consumer[] indexDelegates;
+ private Consumer[] delegates;
private Types.MinorType[] types;
private UnionWriter writer;
@@ -40,11 +40,11 @@ public class AvroUnionsConsumer implements Consumer {
/**
* Instantiate a AvroUnionConsumer.
*/
- public AvroUnionsConsumer(UnionVector vector, Consumer[] indexDelegates, Types.MinorType[] types) {
+ public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) {
this.writer = new UnionWriter(vector);
this.vector = vector;
- this.indexDelegates = indexDelegates;
+ this.delegates = delegates;
this.types = types;
}
@@ -53,7 +53,7 @@ public class AvroUnionsConsumer implements Consumer {
int fieldIndex = decoder.readInt();
int position = writer.getPosition();
- Consumer delegate = indexDelegates[fieldIndex];
+ Consumer delegate = delegates[fieldIndex];
vector.setType(position, types[fieldIndex]);
// In UnionVector we need to set sub vector writer position before consume a value
@@ -80,4 +80,12 @@ public class AvroUnionsConsumer implements Consumer {
vector.setValueCount(writer.getPosition());
return this.vector;
}
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ for (Consumer delegate: delegates) {
+ delegate.close();
+ }
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java
new file mode 100644
index 0000000..0f4b3c3
--- /dev/null
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Composite consumer which hold all consumers.
+ * It manages the consume and cleanup process.
+ */
+public class CompositeAvroConsumer implements AutoCloseable {
+
+ private final List<Consumer> consumers;
+
+ public CompositeAvroConsumer(List<Consumer> consumers) {
+ this.consumers = consumers;
+ }
+
+ /**
+ * Consume decoder data and write into {@link VectorSchemaRoot}.
+ */
+ public void consume(Decoder decoder, VectorSchemaRoot root) throws IOException {
+ int valueCount = 0;
+ while (true) {
+ try {
+ for (Consumer consumer : consumers) {
+ consumer.consume(decoder);
+ }
+ valueCount++;
+ //reach end will throw EOFException.
+ } catch (EOFException eofException) {
+ root.setRowCount(valueCount);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ // clean up
+ for (Consumer consumer : consumers) {
+ try {
+ consumer.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Error occurs in close.", e);
+ }
+ }
+ }
+}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
index c3a543c..be318f6 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
@@ -25,7 +25,7 @@ import org.apache.avro.io.Decoder;
/**
* Interface that is used to consume values from avro decoder.
*/
-public interface Consumer {
+public interface Consumer extends AutoCloseable {
/**
* Consume a specific type value from avro decoder and write it to vector.
@@ -48,4 +48,9 @@ public interface Consumer {
* Get the vector within the consumer.
*/
FieldVector getVector();
+
+ /**
+ * Close this consumer when occurs exception to avoid potential leak.
+ */
+ void close() throws Exception;
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
index 5ac7bd7..05216b1 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
@@ -65,4 +65,9 @@ public class NullableTypeConsumer implements Consumer {
return delegate.getVector();
}
+ @Override
+ public void close() throws Exception {
+ delegate.close();
+ }
+
}