You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ss...@apache.org on 2016/11/02 20:08:36 UTC

[14/18] phoenix git commit: PHOENIX-3416 Memory leak in PhoenixStorageHandler

PHOENIX-3416 Memory leak in PhoenixStorageHandler

Signed-off-by: Sergey Soldatov <ss...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8463d3d8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8463d3d8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8463d3d8

Branch: refs/heads/4.x-HBase-1.1
Commit: 8463d3d870e422e1c50175e6c28b6e51ccdb4488
Parents: d9f730a
Author: Jeongdae Kim <kj...@gmail.com>
Authored: Thu Oct 27 20:50:53 2016 +0900
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Nov 2 13:01:34 2016 -0700

----------------------------------------------------------------------
 .../phoenix/hive/PhoenixStorageHandler.java     | 14 +---
 .../hive/mapreduce/PhoenixInputFormat.java      | 37 +++++----
 .../hive/ppd/PhoenixPredicateDecomposer.java    | 15 +++-
 .../ppd/PhoenixPredicateDecomposerManager.java  | 83 --------------------
 4 files changed, 33 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8463d3d8/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
index e8b5b19..2bc8ace 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
@@ -40,8 +40,6 @@ import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
 import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
 import org.apache.phoenix.hive.mapreduce.PhoenixOutputFormat;
 import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer;
-import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposerManager;
-import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 
 import java.util.List;
@@ -176,19 +174,9 @@ public class PhoenixStorageHandler extends DefaultStorageHandler implements
     public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer,
                                                   ExprNodeDesc predicate) {
         PhoenixSerDe phoenixSerDe = (PhoenixSerDe) deserializer;
-        String tableName = phoenixSerDe.getTableProperties().getProperty
-                (PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);
-        String predicateKey = PhoenixStorageHandlerUtil.getTableKeyOfSession(jobConf, tableName);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Decomposing predicate with predicateKey : " + predicateKey);
-        }
-
         List<String> columnNameList = phoenixSerDe.getSerdeParams().getColumnNames();
-        PhoenixPredicateDecomposer predicateDecomposer = PhoenixPredicateDecomposerManager
-                .createPredicateDecomposer(predicateKey, columnNameList);
 
-        return predicateDecomposer.decomposePredicate(predicate);
+        return PhoenixPredicateDecomposer.create(columnNameList).decomposePredicate(predicate);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8463d3d8/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
index c5f6d18..fd6a631 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
@@ -32,15 +32,14 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RegionSizeCalculator;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -48,7 +47,6 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
 import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer;
-import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposerManager;
 import org.apache.phoenix.hive.ql.index.IndexSearchCondition;
 import org.apache.phoenix.hive.query.PhoenixQueryBuilder;
 import org.apache.phoenix.hive.util.PhoenixConnectionUtil;
@@ -62,6 +60,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.Statement;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -83,8 +82,8 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
 
     @Override
     public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
-        String tableName = jobConf.get(PhoenixConfigurationUtil.INPUT_TABLE_NAME);
-        List<IndexSearchCondition> conditionList = null;
+        String tableName = jobConf.get(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);
+
         String query;
         String executionEngine = jobConf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname,
                 HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.getDefaultValue());
@@ -97,17 +96,17 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
         }
 
         if (PhoenixStorageHandlerConstants.MR.equals(executionEngine)) {
-            String predicateKey = PhoenixStorageHandlerUtil.getTableKeyOfSession(jobConf,
-                    tableName);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("PredicateKey for MR job : " + predicateKey);
-            }
-
-            PhoenixPredicateDecomposer predicateDecomposer =
-                    PhoenixPredicateDecomposerManager.getPredicateDecomposer(predicateKey);
-            if (predicateDecomposer != null && predicateDecomposer.isCalledPPD()) {
-                conditionList = predicateDecomposer.getSearchConditionList();
+            List<IndexSearchCondition> conditionList = null;
+            String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+            if (filterExprSerialized != null) {
+                ExprNodeGenericFuncDesc filterExpr =
+                        Utilities.deserializeExpression(filterExprSerialized);
+                PhoenixPredicateDecomposer predicateDecomposer =
+                        PhoenixPredicateDecomposer.create(Arrays.asList(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")));
+                predicateDecomposer.decomposePredicate(filterExpr);
+                if (predicateDecomposer.isCalledPPD()) {
+                    conditionList = predicateDecomposer.getSearchConditionList();
+                }
             }
 
             query = PhoenixQueryBuilder.getInstance().buildQuery(jobConf, tableName,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8463d3d8/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
index b94e4df..1e65819 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
@@ -42,11 +42,19 @@ public class PhoenixPredicateDecomposer {
 
     private List<IndexSearchCondition> searchConditionList;
 
-    public PhoenixPredicateDecomposer(List<String> columnNameList) {
+    public static PhoenixPredicateDecomposer create(List<String> columnNameList) {
+        return new PhoenixPredicateDecomposer(columnNameList);
+    }
+
+    private PhoenixPredicateDecomposer(List<String> columnNameList) {
         this.columnNameList = columnNameList;
     }
 
     public DecomposedPredicate decomposePredicate(ExprNodeDesc predicate) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("predicate - " + predicate.toString());
+        }
+
         IndexPredicateAnalyzer analyzer = PredicateAnalyzerFactory.createPredicateAnalyzer
                 (columnNameList, getFieldValidator());
         DecomposedPredicate decomposed = new DecomposedPredicate();
@@ -65,6 +73,11 @@ public class PhoenixPredicateDecomposer {
             }
         }
 
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("decomposed predicate - residualPredicate: " + decomposed.residualPredicate +
+            ", pushedPredicate: " + decomposed.pushedPredicate);
+        }
+
         return decomposed;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8463d3d8/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java
deleted file mode 100644
index 2faef73..0000000
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hive.ppd;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Support class that produces PredicateDecomposer for PhoenixStorageHandler
- */
-
-public class PhoenixPredicateDecomposerManager {
-
-    private static final Log LOG = LogFactory.getLog(PhoenixPredicateDecomposerManager.class);
-
-    // In case of absence of WHERE clause, PhoenixPredicateDecomposer is not created because
-    // it's not called method of StorageHandler.decomposePredicate.
-
-    private static final Map<String, List<PhoenixPredicateDecomposer>> PREDICATE_DECOMPOSER_MAP =
-            Maps.newConcurrentMap();
-
-    public static PhoenixPredicateDecomposer createPredicateDecomposer(String predicateKey,
-                                                                       List<String>
-                                                                               columnNameList) {
-        List<PhoenixPredicateDecomposer> predicateDecomposerList = PREDICATE_DECOMPOSER_MAP.get
-                (predicateKey);
-        if (predicateDecomposerList == null) {
-            predicateDecomposerList = Lists.newArrayList();
-            PREDICATE_DECOMPOSER_MAP.put(predicateKey, predicateDecomposerList);
-        }
-
-        PhoenixPredicateDecomposer predicateDecomposer = new PhoenixPredicateDecomposer
-                (columnNameList);
-        predicateDecomposerList.add(predicateDecomposer);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Predicate-decomposer : " + PREDICATE_DECOMPOSER_MAP + " [" +
-                    predicateKey + "] : " + predicateDecomposer);
-        }
-
-        return predicateDecomposer;
-    }
-
-    public static PhoenixPredicateDecomposer getPredicateDecomposer(String predicateKey) {
-        List<PhoenixPredicateDecomposer> predicateDecomposerList = PREDICATE_DECOMPOSER_MAP.get
-                (predicateKey);
-
-        PhoenixPredicateDecomposer predicateDecomposer = null;
-        if (predicateDecomposerList != null && predicateDecomposerList.size() > 0) {
-            predicateDecomposer = predicateDecomposerList.remove(0);
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Predicate-decomposer : " + PREDICATE_DECOMPOSER_MAP + " [" + predicateKey
-                    + "] : " + predicateDecomposer);
-        }
-
-        return predicateDecomposer;
-    }
-
-    private PhoenixPredicateDecomposerManager() {
-    }
-}