You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ss...@apache.org on 2016/11/02 20:08:24 UTC
[02/18] phoenix git commit: PHOENIX-3416 Memory leak in
PhoenixStorageHandler
PHOENIX-3416 Memory leak in PhoenixStorageHandler
Signed-off-by: Sergey Soldatov <ss...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e1afbcce
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e1afbcce
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e1afbcce
Branch: refs/heads/master
Commit: e1afbcce546b822e81f7fa0fc37e0de6fe5b8d0c
Parents: 86f5160
Author: Jeongdae Kim <kj...@gmail.com>
Authored: Thu Oct 27 20:50:53 2016 +0900
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Nov 2 12:37:05 2016 -0700
----------------------------------------------------------------------
.../phoenix/hive/PhoenixStorageHandler.java | 14 +---
.../hive/mapreduce/PhoenixInputFormat.java | 37 +++++----
.../hive/ppd/PhoenixPredicateDecomposer.java | 15 +++-
.../ppd/PhoenixPredicateDecomposerManager.java | 83 --------------------
4 files changed, 33 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1afbcce/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
index e8b5b19..2bc8ace 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
@@ -40,8 +40,6 @@ import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.hive.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer;
-import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposerManager;
-import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import java.util.List;
@@ -176,19 +174,9 @@ public class PhoenixStorageHandler extends DefaultStorageHandler implements
public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer,
ExprNodeDesc predicate) {
PhoenixSerDe phoenixSerDe = (PhoenixSerDe) deserializer;
- String tableName = phoenixSerDe.getTableProperties().getProperty
- (PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);
- String predicateKey = PhoenixStorageHandlerUtil.getTableKeyOfSession(jobConf, tableName);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decomposing predicate with predicateKey : " + predicateKey);
- }
-
List<String> columnNameList = phoenixSerDe.getSerdeParams().getColumnNames();
- PhoenixPredicateDecomposer predicateDecomposer = PhoenixPredicateDecomposerManager
- .createPredicateDecomposer(predicateKey, columnNameList);
- return predicateDecomposer.decomposePredicate(predicate);
+ return PhoenixPredicateDecomposer.create(columnNameList).decomposePredicate(predicate);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1afbcce/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
index c5f6d18..fd6a631 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
@@ -32,15 +32,14 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSizeCalculator;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -48,7 +47,6 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer;
-import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposerManager;
import org.apache.phoenix.hive.ql.index.IndexSearchCondition;
import org.apache.phoenix.hive.query.PhoenixQueryBuilder;
import org.apache.phoenix.hive.util.PhoenixConnectionUtil;
@@ -62,6 +60,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Statement;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -83,8 +82,8 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
- String tableName = jobConf.get(PhoenixConfigurationUtil.INPUT_TABLE_NAME);
- List<IndexSearchCondition> conditionList = null;
+ String tableName = jobConf.get(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);
+
String query;
String executionEngine = jobConf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.getDefaultValue());
@@ -97,17 +96,17 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
}
if (PhoenixStorageHandlerConstants.MR.equals(executionEngine)) {
- String predicateKey = PhoenixStorageHandlerUtil.getTableKeyOfSession(jobConf,
- tableName);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("PredicateKey for MR job : " + predicateKey);
- }
-
- PhoenixPredicateDecomposer predicateDecomposer =
- PhoenixPredicateDecomposerManager.getPredicateDecomposer(predicateKey);
- if (predicateDecomposer != null && predicateDecomposer.isCalledPPD()) {
- conditionList = predicateDecomposer.getSearchConditionList();
+ List<IndexSearchCondition> conditionList = null;
+ String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ if (filterExprSerialized != null) {
+ ExprNodeGenericFuncDesc filterExpr =
+ Utilities.deserializeExpression(filterExprSerialized);
+ PhoenixPredicateDecomposer predicateDecomposer =
+ PhoenixPredicateDecomposer.create(Arrays.asList(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")));
+ predicateDecomposer.decomposePredicate(filterExpr);
+ if (predicateDecomposer.isCalledPPD()) {
+ conditionList = predicateDecomposer.getSearchConditionList();
+ }
}
query = PhoenixQueryBuilder.getInstance().buildQuery(jobConf, tableName,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1afbcce/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
index b94e4df..1e65819 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java
@@ -42,11 +42,19 @@ public class PhoenixPredicateDecomposer {
private List<IndexSearchCondition> searchConditionList;
- public PhoenixPredicateDecomposer(List<String> columnNameList) {
+ public static PhoenixPredicateDecomposer create(List<String> columnNameList) {
+ return new PhoenixPredicateDecomposer(columnNameList);
+ }
+
+ private PhoenixPredicateDecomposer(List<String> columnNameList) {
this.columnNameList = columnNameList;
}
public DecomposedPredicate decomposePredicate(ExprNodeDesc predicate) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("predicate - " + predicate.toString());
+ }
+
IndexPredicateAnalyzer analyzer = PredicateAnalyzerFactory.createPredicateAnalyzer
(columnNameList, getFieldValidator());
DecomposedPredicate decomposed = new DecomposedPredicate();
@@ -65,6 +73,11 @@ public class PhoenixPredicateDecomposer {
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("decomposed predicate - residualPredicate: " + decomposed.residualPredicate +
+ ", pushedPredicate: " + decomposed.pushedPredicate);
+ }
+
return decomposed;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1afbcce/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java
deleted file mode 100644
index 2faef73..0000000
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hive.ppd;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Support class that produces PredicateDecomposer for PhoenixStorageHandler
- */
-
-public class PhoenixPredicateDecomposerManager {
-
- private static final Log LOG = LogFactory.getLog(PhoenixPredicateDecomposerManager.class);
-
- // In case of absence of WHERE clause, PhoenixPredicateDecomposer is not created because
- // it's not called method of StorageHandler.decomposePredicate.
-
- private static final Map<String, List<PhoenixPredicateDecomposer>> PREDICATE_DECOMPOSER_MAP =
- Maps.newConcurrentMap();
-
- public static PhoenixPredicateDecomposer createPredicateDecomposer(String predicateKey,
- List<String>
- columnNameList) {
- List<PhoenixPredicateDecomposer> predicateDecomposerList = PREDICATE_DECOMPOSER_MAP.get
- (predicateKey);
- if (predicateDecomposerList == null) {
- predicateDecomposerList = Lists.newArrayList();
- PREDICATE_DECOMPOSER_MAP.put(predicateKey, predicateDecomposerList);
- }
-
- PhoenixPredicateDecomposer predicateDecomposer = new PhoenixPredicateDecomposer
- (columnNameList);
- predicateDecomposerList.add(predicateDecomposer);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Predicate-decomposer : " + PREDICATE_DECOMPOSER_MAP + " [" +
- predicateKey + "] : " + predicateDecomposer);
- }
-
- return predicateDecomposer;
- }
-
- public static PhoenixPredicateDecomposer getPredicateDecomposer(String predicateKey) {
- List<PhoenixPredicateDecomposer> predicateDecomposerList = PREDICATE_DECOMPOSER_MAP.get
- (predicateKey);
-
- PhoenixPredicateDecomposer predicateDecomposer = null;
- if (predicateDecomposerList != null && predicateDecomposerList.size() > 0) {
- predicateDecomposer = predicateDecomposerList.remove(0);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Predicate-decomposer : " + PREDICATE_DECOMPOSER_MAP + " [" + predicateKey
- + "] : " + predicateDecomposer);
- }
-
- return predicateDecomposer;
- }
-
- private PhoenixPredicateDecomposerManager() {
- }
-}