You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/08 04:50:45 UTC

[2/5] git commit: DRILL-640: Fix memory leak in limit operator

DRILL-640: Fix memory leak in limit operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f2ff2c9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f2ff2c9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f2ff2c9d

Branch: refs/heads/master
Commit: f2ff2c9d2aad429f042da8250ca6e1ef1f160318
Parents: d870b6e
Author: Mehant Baid <me...@gmail.com>
Authored: Mon May 5 14:48:18 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed May 7 18:43:13 2014 -0700

----------------------------------------------------------------------
 .../physical/impl/limit/LimitRecordBatch.java   | 12 +++
 .../impl/limit/TestLimitWithExchanges.java      | 29 +++++++
 .../test/resources/limit/limit_exchanges.json   | 87 ++++++++++++++++++++
 3 files changed, 128 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f2ff2c9d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 3f2ec27..ed56e79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -81,6 +81,18 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   public IterOutcome next() {
     if(!noEndLimit && recordsLeft <= 0) {
       // don't kill incoming batches or call cleanup yet, as this could close allocators before the buffers have been cleared
+      // Drain the incoming record batch and clear the memory
+      IterOutcome upStream = incoming.next();
+
+      while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
+
+        // Clear the memory for the incoming batch
+        for (VectorWrapper<?> wrapper : incoming) {
+          wrapper.getValueVector().clear();
+        }
+        upStream = incoming.next();
+      }
+
       return IterOutcome.NONE;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f2ff2c9d/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
new file mode 100644
index 0000000..0e4d734
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestLimitWithExchanges extends BaseTestQuery {
+
+  @Test
+  public void testLimitWithExchanges() throws Exception{
+    testPhysicalFromFile("limit/limit_exchanges.json");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f2ff2c9d/exec/java-exec/src/test/resources/limit/limit_exchanges.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/limit/limit_exchanges.json b/exec/java-exec/src/test/resources/limit/limit_exchanges.json
new file mode 100644
index 0000000..5ad56be
--- /dev/null
+++ b/exec/java-exec/src/test/resources/limit/limit_exchanges.json
@@ -0,0 +1,87 @@
+{
+  "head" : {
+    "version" : 1,
+    "generator" : {
+      "type" : "DefaultSqlHandler",
+      "info" : ""
+    },
+    "type" : "APACHE_DRILL_PHYSICAL",
+    "resultMode" : "EXEC"
+  },
+  "graph" : [ {
+    "pop" : "parquet-scan",
+    "@id" : 1,
+    "entries" : [ {
+      "path" : "/tpch/nation.parquet"
+    } ],
+    "storage" : {
+      "type" : "file",
+      "connection" : "classpath:///",
+      "workspaces" : null,
+      "formats" : {
+        "psv" : {
+          "type" : "text",
+          "extensions" : [ "tbl" ],
+          "delimiter" : "|"
+        },
+        "csv" : {
+          "type" : "text",
+          "extensions" : [ "csv" ],
+          "delimiter" : ","
+        },
+        "tsv" : {
+          "type" : "text",
+          "extensions" : [ "tsv" ],
+          "delimiter" : "\t"
+        },
+        "parquet" : {
+          "type" : "parquet"
+        },
+        "json" : {
+          "type" : "json"
+        }
+      }
+    },
+    "format" : {
+      "type" : "parquet"
+    },
+    "selectionRoot" : "/Users/mbaid/sources/drill/tpch-work/sample-data/nationsMF"
+  }, {
+    "pop" : "project",
+    "@id" : 2,
+    "exprs" : [ {
+      "ref" : "`N_NATIONKEY`",
+      "expr" : "`N_NATIONKEY`"
+    } ],
+    "child" : 1
+  }, {
+    "pop" : "hash-to-random-exchange",
+    "@id" : 3,
+    "child" : 2,
+    "expr" : "hash(`N_NATIONKEY`) "
+  }, {
+    "pop" : "union-exchange",
+    "@id" : 4,
+    "child" : 3
+  }, 
+{
+    "pop" : "project",
+    "@id" : 5,
+    "exprs" : [ { 
+      "ref" : "`N_NATIONKEY`",
+      "expr" : "`N_NATIONKEY`"
+    } ],
+    "child" : 4
+  },
+{
+    "pop" : "limit",
+    "@id" : 6,
+    "child" : 5,
+    "first" : 0,
+    "last" : 1
+  }, {
+    "pop" : "screen",
+    "@id" : 7,
+    "child" : 6
+  } ]
+}