You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/03/03 20:20:33 UTC
[1/2] phoenix git commit: PHOENIX-4635 HBase Connection leak in
org.apache.phoenix.hive.mapreduce.PhoenixInputFormat
Repository: phoenix
Updated Branches:
refs/heads/4.x-cdh5.11.2 ca5c9d03c -> d14233ccd
PHOENIX-4635 HBase Connection leak in org.apache.phoenix.hive.mapreduce.PhoenixInputFormat
Signed-off-by: Geoffrey Jacoby <gj...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3fe43771
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3fe43771
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3fe43771
Branch: refs/heads/4.x-cdh5.11.2
Commit: 3fe43771e82862f842cb6019b18f90330ada3e47
Parents: ca5c9d0
Author: Yechao Chen <ch...@gmail.com>
Authored: Fri Mar 2 01:53:04 2018 +0000
Committer: Pedro Boado <pb...@apache.org>
Committed: Sat Mar 3 20:14:41 2018 +0000
----------------------------------------------------------------------
.../org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fe43771/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
index f0a5dd6..b550e32 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
@@ -150,7 +150,7 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
setScanCacheSize(jobConf);
// Adding Localization
- HConnection connection = HConnectionManager.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf));
+ try (HConnection connection = HConnectionManager.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
.getTableRef().getTable().getPhysicalName().toString()));
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection
@@ -203,6 +203,7 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
psplits.add(inputSplit);
}
}
+ }
return psplits;
}
[2/2] phoenix git commit: PHOENIX-4607 - Allow PhoenixInputFormat to
use tenant-specific connections
Posted by pb...@apache.org.
PHOENIX-4607 - Allow PhoenixInputFormat to use tenant-specific connections
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d14233cc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d14233cc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d14233cc
Branch: refs/heads/4.x-cdh5.11.2
Commit: d14233ccd3c450a8aff6819d9874b540838968ed
Parents: 3fe4377
Author: Geoffrey <gj...@salesforce.com>
Authored: Tue Feb 20 22:28:40 2018 +0000
Committer: Pedro Boado <pb...@apache.org>
Committed: Sat Mar 3 20:14:52 2018 +0000
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/MapReduceIT.java | 69 +++++++++++++++-----
.../phoenix/mapreduce/PhoenixInputFormat.java | 41 +++++++-----
.../util/PhoenixConfigurationUtil.java | 15 ++++-
.../mapreduce/util/PhoenixMapReduceUtil.java | 5 +-
4 files changed, 92 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d14233cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
index 68d9c9c..fb24bb2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -30,11 +30,13 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PhoenixArray;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.sql.*;
+import java.util.Properties;
import static org.junit.Assert.*;
@@ -50,12 +52,19 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
" STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR INTEGER NOT NULL, RECORDINGS_QUARTER " +
" DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR ))";
+ private static final String CREATE_STOCK_VIEW = "CREATE VIEW IF NOT EXISTS %s (v1 VARCHAR) AS "
+ + " SELECT * FROM %s WHERE RECORDING_YEAR = 2008";
+
private static final String MAX_RECORDING = "MAX_RECORDING";
private String CREATE_STOCK_STATS_TABLE =
"CREATE TABLE IF NOT EXISTS %s(STOCK_NAME VARCHAR NOT NULL , "
+ " MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME ))";
+
+
private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+ private String TENANT_ID = "1234567890";
+
@Before
public void setupTables() throws Exception {
@@ -63,22 +72,28 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
@Test
public void testNoConditionsOnSelect() throws Exception {
- Connection conn = DriverManager.getConnection(getUrl());
- String stockTableName = generateUniqueName();
- String stockStatsTableName = generateUniqueName();
- conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
- conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
- conn.commit();
- final Configuration conf = getUtility().getConfiguration();
- Job job = Job.getInstance(conf);
- PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockTableName, null,
- STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
- testJob(job, stockTableName, stockStatsTableName, 91.04);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createAndTestJob(conn, null, 91.04, null);
+ }
}
@Test
public void testConditionsOnSelect() throws Exception {
- Connection conn = DriverManager.getConnection(getUrl());
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createAndTestJob(conn, RECORDING_YEAR + " < 2009", 81.04, null);
+ }
+ }
+
+ @Test
+ public void testWithTenantId() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())){
+ //tenant view will perform the same filter as the select conditions do in testConditionsOnSelect
+ createAndTestJob(conn, null, 81.04, TENANT_ID);
+ }
+
+ }
+
+ private void createAndTestJob(Connection conn, String s, double v, String tenantId) throws SQLException, IOException, InterruptedException, ClassNotFoundException {
String stockTableName = generateUniqueName();
String stockStatsTableName = generateUniqueName();
conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
@@ -86,14 +101,33 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
conn.commit();
final Configuration conf = getUtility().getConfiguration();
Job job = Job.getInstance(conf);
- PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockTableName, RECORDING_YEAR+" < 2009",
+ if (tenantId != null) {
+ setInputForTenant(job, tenantId, stockTableName, s);
+
+ } else {
+ PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockTableName, s,
STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
- testJob(job, stockTableName, stockStatsTableName, 81.04);
+ }
+ testJob(conn, job, stockTableName, stockStatsTableName, v);
+
+ }
+
+ private void setInputForTenant(Job job, String tenantId, String stockTableName, String s) throws SQLException {
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, TENANT_ID);
+ try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)){
+ PhoenixMapReduceUtil.setTenantId(job, tenantId);
+ String stockViewName = generateUniqueName();
+ tenantConn.createStatement().execute(String.format(CREATE_STOCK_VIEW, stockViewName, stockTableName));
+ tenantConn.commit();
+ PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockViewName, s,
+ STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+ }
}
- private void testJob(Job job, String stockTableName, String stockStatsTableName, double expectedMax)
+ private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName, double expectedMax)
throws SQLException, InterruptedException, IOException, ClassNotFoundException {
- upsertData(stockTableName);
+ upsertData(conn, stockTableName);
// only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints
job.getConfiguration().set("mapreduce.framework.name", "local");
@@ -135,8 +169,7 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
job.setOutputFormatClass(PhoenixOutputFormat.class);
}
- private void upsertData(String stockTableName) throws SQLException {
- Connection conn = DriverManager.getConnection(getUrl());
+ private void upsertData(Connection conn, String stockTableName) throws SQLException {
PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName));
upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d14233cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 9f16cc1..6093edd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -169,32 +169,39 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
try {
final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ final String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
final Properties overridingProps = new Properties();
if(txnScnValue==null && currentScnValue!=null) {
overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
}
- final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps);
- final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
- Preconditions.checkNotNull(selectStatement);
- final Statement statement = connection.createStatement();
- final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
- // Optimize the query plan so that we potentially use secondary indexes
- final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
- final Scan scan = queryPlan.getContext().getScan();
- // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver
- if (txnScnValue!=null) {
- scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
+ if (tenantId != null && configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
+ overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
+ try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps);
+ final Statement statement = connection.createStatement()) {
+
+ final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ Preconditions.checkNotNull(selectStatement);
+
+ final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+ // Optimize the query plan so that we potentially use secondary indexes
+ final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+ final Scan scan = queryPlan.getContext().getScan();
+ // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver
+ if (txnScnValue != null) {
+ scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
+ }
- // setting the snapshot configuration
- String snapshotName = configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
- if (snapshotName != null)
+ // setting the snapshot configuration
+ String snapshotName = configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+ if (snapshotName != null)
PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
getQueryServices().getConfiguration(), snapshotName);
- // Initialize the query plan so it sets up the parallel scans
- queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
- return queryPlan;
+ // Initialize the query plan so it sets up the parallel scans
+ queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+ return queryPlan;
+ }
} catch (Exception exception) {
LOG.error(String.format("Failed to get the query plan with error [%s]",
exception.getMessage()));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d14233cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 3c27f65..f3f0415 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -26,6 +26,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -147,6 +148,8 @@ public final class PhoenixConfigurationUtil {
public static final String RESTORE_DIR_KEY = "phoenix.tableSnapshot.restore.dir";
+ public static final String MAPREDUCE_TENANT_ID = "phoenix.mapreduce.tenantid";
+
public enum SchemaType {
TABLE,
QUERY;
@@ -343,7 +346,12 @@ public final class PhoenixConfigurationUtil {
}
final String tableName = getInputTableName(configuration);
Preconditions.checkNotNull(tableName);
- final Connection connection = ConnectionUtil.getInputConnection(configuration);
+ Properties props = new Properties();
+ String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+ if (tenantId != null) {
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ final Connection connection = ConnectionUtil.getInputConnection(configuration, props);
final List<String> selectColumnList = getSelectColumnList(configuration);
columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
// we put the encoded column infos in the Configuration for re usability.
@@ -658,4 +666,9 @@ public final class PhoenixConfigurationUtil {
return conn.getQueryServices().getConfiguration()
.getBoolean(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION);
}
+
+ public static void setTenantId(Configuration configuration, String tenantId){
+ Preconditions.checkNotNull(configuration);
+ configuration.set(MAPREDUCE_TENANT_ID, tenantId);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d14233cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index b0981ef..3462177 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -147,7 +147,6 @@ public final class PhoenixMapReduceUtil {
/**
*
* @param job
- * @param outputClass
* @param tableName Output table
* @param columns List of columns separated by ,
*/
@@ -162,7 +161,6 @@ public final class PhoenixMapReduceUtil {
/**
*
* @param job
- * @param outputClass
* @param tableName Output table
* @param fieldNames fields
*/
@@ -183,5 +181,8 @@ public final class PhoenixMapReduceUtil {
PhoenixConfigurationUtil.setOutputCluster(configuration, quorum);
}
+ public static void setTenantId(final Job job, final String tenantId) {
+ PhoenixConfigurationUtil.setTenantId(job.getConfiguration(), tenantId);
+ }
}