You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/03/03 20:20:33 UTC

[1/2] phoenix git commit: PHOENIX-4635 HBase Connection leak in org.apache.phoenix.hive.mapreduce.PhoenixInputFormat

Repository: phoenix
Updated Branches:
  refs/heads/4.x-cdh5.11.2 ca5c9d03c -> d14233ccd


PHOENIX-4635 HBase Connection leak in org.apache.phoenix.hive.mapreduce.PhoenixInputFormat

Signed-off-by: Geoffrey Jacoby <gj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3fe43771
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3fe43771
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3fe43771

Branch: refs/heads/4.x-cdh5.11.2
Commit: 3fe43771e82862f842cb6019b18f90330ada3e47
Parents: ca5c9d0
Author: Yechao Chen <ch...@gmail.com>
Authored: Fri Mar 2 01:53:04 2018 +0000
Committer: Pedro Boado <pb...@apache.org>
Committed: Sat Mar 3 20:14:41 2018 +0000

----------------------------------------------------------------------
 .../org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fe43771/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
index f0a5dd6..b550e32 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
@@ -150,7 +150,7 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
         setScanCacheSize(jobConf);
 
         // Adding Localization
-        HConnection connection = HConnectionManager.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf));
+        try (HConnection connection = HConnectionManager.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
         RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
                 .getTableRef().getTable().getPhysicalName().toString()));
         RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection
@@ -203,6 +203,7 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
                 psplits.add(inputSplit);
             }
         }
+        }
 
         return psplits;
     }


[2/2] phoenix git commit: PHOENIX-4607 - Allow PhoenixInputFormat to use tenant-specific connections

Posted by pb...@apache.org.
PHOENIX-4607 - Allow PhoenixInputFormat to use tenant-specific connections


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d14233cc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d14233cc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d14233cc

Branch: refs/heads/4.x-cdh5.11.2
Commit: d14233ccd3c450a8aff6819d9874b540838968ed
Parents: 3fe4377
Author: Geoffrey <gj...@salesforce.com>
Authored: Tue Feb 20 22:28:40 2018 +0000
Committer: Pedro Boado <pb...@apache.org>
Committed: Sat Mar 3 20:14:52 2018 +0000

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/MapReduceIT.java | 69 +++++++++++++++-----
 .../phoenix/mapreduce/PhoenixInputFormat.java   | 41 +++++++-----
 .../util/PhoenixConfigurationUtil.java          | 15 ++++-
 .../mapreduce/util/PhoenixMapReduceUtil.java    |  5 +-
 4 files changed, 92 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d14233cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
index 68d9c9c..fb24bb2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -30,11 +30,13 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PhoenixArray;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.sql.*;
+import java.util.Properties;
 
 import static org.junit.Assert.*;
 
@@ -50,12 +52,19 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
             " STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR  INTEGER NOT  NULL,  RECORDINGS_QUARTER " +
             " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR ))";
 
+    private static final String CREATE_STOCK_VIEW = "CREATE VIEW IF NOT EXISTS %s (v1 VARCHAR) AS "
+        + " SELECT * FROM %s WHERE RECORDING_YEAR = 2008";
+
     private static final String MAX_RECORDING = "MAX_RECORDING";
     private  String CREATE_STOCK_STATS_TABLE =
             "CREATE TABLE IF NOT EXISTS %s(STOCK_NAME VARCHAR NOT NULL , "
                     + " MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME ))";
+
+
     private String UPSERT = "UPSERT into %s values (?, ?, ?)";
 
+    private String TENANT_ID = "1234567890";
+
     @Before
     public void setupTables() throws Exception {
 
@@ -63,22 +72,28 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
 
     @Test
     public void testNoConditionsOnSelect() throws Exception {
-        Connection conn = DriverManager.getConnection(getUrl());
-        String stockTableName = generateUniqueName();
-        String stockStatsTableName = generateUniqueName();
-        conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
-        conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
-        conn.commit();
-        final Configuration conf = getUtility().getConfiguration();
-        Job job = Job.getInstance(conf);
-        PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockTableName, null,
-                STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
-        testJob(job, stockTableName, stockStatsTableName, 91.04);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            createAndTestJob(conn, null, 91.04, null);
+        }
     }
 
     @Test
     public void testConditionsOnSelect() throws Exception {
-        Connection conn = DriverManager.getConnection(getUrl());
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            createAndTestJob(conn, RECORDING_YEAR + "  < 2009", 81.04, null);
+        }
+    }
+
+    @Test
+    public void testWithTenantId() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())){
+            //tenant view will perform the same filter as the select conditions do in testConditionsOnSelect
+            createAndTestJob(conn, null, 81.04, TENANT_ID);
+        }
+
+    }
+
+    private void createAndTestJob(Connection conn, String s, double v, String tenantId) throws SQLException, IOException, InterruptedException, ClassNotFoundException {
         String stockTableName = generateUniqueName();
         String stockStatsTableName = generateUniqueName();
         conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
@@ -86,14 +101,33 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
         conn.commit();
         final Configuration conf = getUtility().getConfiguration();
         Job job = Job.getInstance(conf);
-        PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockTableName, RECORDING_YEAR+"  < 2009",
+        if (tenantId != null) {
+            setInputForTenant(job, tenantId, stockTableName, s);
+
+        } else {
+            PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockTableName, s,
                 STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
-        testJob(job, stockTableName, stockStatsTableName, 81.04);
+        }
+        testJob(conn, job, stockTableName, stockStatsTableName, v);
+
+    }
+
+    private void setInputForTenant(Job job, String tenantId, String stockTableName, String s) throws SQLException {
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, TENANT_ID);
+        try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)){
+            PhoenixMapReduceUtil.setTenantId(job, tenantId);
+            String stockViewName = generateUniqueName();
+            tenantConn.createStatement().execute(String.format(CREATE_STOCK_VIEW, stockViewName, stockTableName));
+            tenantConn.commit();
+            PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockViewName, s,
+                STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+        }
     }
 
-    private void testJob(Job job, String stockTableName, String stockStatsTableName, double expectedMax)
+    private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName, double expectedMax)
             throws SQLException, InterruptedException, IOException, ClassNotFoundException {
-        upsertData(stockTableName);
+        upsertData(conn, stockTableName);
 
         // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints
         job.getConfiguration().set("mapreduce.framework.name", "local");
@@ -135,8 +169,7 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
         job.setOutputFormatClass(PhoenixOutputFormat.class);
     }
 
-    private void upsertData(String stockTableName) throws SQLException {
-        Connection conn = DriverManager.getConnection(getUrl());
+    private void upsertData(Connection conn, String stockTableName) throws SQLException {
         PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName));
         upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
         upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d14233cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 9f16cc1..6093edd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -169,32 +169,39 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
         try {
             final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
             final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+            final String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
             final Properties overridingProps = new Properties();
             if(txnScnValue==null && currentScnValue!=null) {
                 overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
             }
-            final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps);
-            final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
-            Preconditions.checkNotNull(selectStatement);
-            final Statement statement = connection.createStatement();
-            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
-            // Optimize the query plan so that we potentially use secondary indexes            
-            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
-            final Scan scan = queryPlan.getContext().getScan();
-            // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver 
-            if (txnScnValue!=null) {
-                scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
+            if (tenantId != null && configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
+                overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
             }
+            try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps);
+                 final Statement statement = connection.createStatement()) {
+
+              final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+              Preconditions.checkNotNull(selectStatement);
+
+              final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+              // Optimize the query plan so that we potentially use secondary indexes
+              final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+              final Scan scan = queryPlan.getContext().getScan();
+              // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver
+              if (txnScnValue != null) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
+              }
 
-            // setting the snapshot configuration
-            String snapshotName = configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
-            if (snapshotName != null)
+              // setting the snapshot configuration
+              String snapshotName = configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+              if (snapshotName != null)
                 PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
                     getQueryServices().getConfiguration(), snapshotName);
 
-            // Initialize the query plan so it sets up the parallel scans
-            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
-            return queryPlan;
+              // Initialize the query plan so it sets up the parallel scans
+              queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+              return queryPlan;
+            }
         } catch (Exception exception) {
             LOG.error(String.format("Failed to get the query plan with error [%s]",
                 exception.getMessage()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d14233cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 3c27f65..f3f0415 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -26,6 +26,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -147,6 +148,8 @@ public final class PhoenixConfigurationUtil {
 
     public static final String RESTORE_DIR_KEY = "phoenix.tableSnapshot.restore.dir";
 
+    public static final String MAPREDUCE_TENANT_ID = "phoenix.mapreduce.tenantid";
+
     public enum SchemaType {
         TABLE,
         QUERY;
@@ -343,7 +346,12 @@ public final class PhoenixConfigurationUtil {
         }
         final String tableName = getInputTableName(configuration);
         Preconditions.checkNotNull(tableName);
-        final Connection connection = ConnectionUtil.getInputConnection(configuration);
+        Properties props = new Properties();
+        String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        if (tenantId != null) {
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        final Connection connection = ConnectionUtil.getInputConnection(configuration, props);
         final List<String> selectColumnList = getSelectColumnList(configuration);
         columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
         // we put the encoded column infos in the Configuration for re usability.
@@ -658,4 +666,9 @@ public final class PhoenixConfigurationUtil {
 	    return conn.getQueryServices().getConfiguration()
 	            .getBoolean(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION);
 	}
+
+    public static void setTenantId(Configuration configuration, String tenantId){
+        Preconditions.checkNotNull(configuration);
+        configuration.set(MAPREDUCE_TENANT_ID, tenantId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d14233cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index b0981ef..3462177 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -147,7 +147,6 @@ public final class PhoenixMapReduceUtil {
     /**
      *
      * @param job
-     * @param outputClass
      * @param tableName  Output table
      * @param columns    List of columns separated by ,
      */
@@ -162,7 +161,6 @@ public final class PhoenixMapReduceUtil {
     /**
      *
      * @param job
-     * @param outputClass
      * @param tableName  Output table
      * @param fieldNames fields
      */
@@ -183,5 +181,8 @@ public final class PhoenixMapReduceUtil {
         PhoenixConfigurationUtil.setOutputCluster(configuration, quorum);
     }
 
+    public static void setTenantId(final Job job, final String tenantId) {
+        PhoenixConfigurationUtil.setTenantId(job.getConfiguration(), tenantId);
+    }
 
 }