You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2022/05/26 18:09:10 UTC

[GitHub] [phoenix-connectors] joshelser commented on a diff in pull request #79: PHOENIX-6698 hive-connector will take long time to generate splits fo…

joshelser commented on code in PR #79:
URL: https://github.com/apache/phoenix-connectors/pull/79#discussion_r882833463


##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -119,74 +124,144 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
     }
 
     private List<InputSplit> generateSplits(final JobConf jobConf, final QueryPlan qplan,
-                                            final List<KeyRange> splits, String query) throws
-            IOException {
-        if (qplan == null){
+                                            final List<KeyRange> splits, final String query)
+            throws IOException {
+
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshold = jobConf.getInt(
+                PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
+                PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
         setScanCacheSize(jobConf);
+        try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory
+                .createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
+            final RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(
+                            qplan.getTableRef().getTable().getPhysicalName().toString()));
+            final int scanSize = qplan.getScans().size();
+            if (useParallelInputGeneration(parallelThreshold, scanSize)) {
+                final int parallism = jobConf.getInt(

Review Comment:
   ```suggestion
                   final int parallelism = jobConf.getInt(
   ```



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -119,74 +124,144 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
     }
 
     private List<InputSplit> generateSplits(final JobConf jobConf, final QueryPlan qplan,
-                                            final List<KeyRange> splits, String query) throws
-            IOException {
-        if (qplan == null){
+                                            final List<KeyRange> splits, final String query)
+            throws IOException {
+
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshold = jobConf.getInt(
+                PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
+                PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
         setScanCacheSize(jobConf);
+        try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory
+                .createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
+            final RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(
+                            qplan.getTableRef().getTable().getPhysicalName().toString()));
+            final int scanSize = qplan.getScans().size();
+            if (useParallelInputGeneration(parallelThreshold, scanSize)) {
+                final int parallism = jobConf.getInt(
+                        PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,
+                        PhoenixStorageHandlerConstants
+                                .DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT);
+                ExecutorService executorService = Executors.newFixedThreadPool(parallism);
+                LOG.info("Generate Input Splits in Parallel with {} threads", parallism);
 
-        // Adding Localization
-        try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
-        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
-                .getTableRef().getTable().getPhysicalName().toString()));
-
-        for (List<Scan> scans : qplan.getScans()) {
-            PhoenixInputSplit inputSplit;
-
-            HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow()
-                    , false);
-            long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
-            String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
-
-            if (splitByStats) {
-                for (Scan aScan : scans) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Split for  scan : " + aScan + "with scanAttribute : " + aScan
-                                .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
-                                aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
-                                .getBatch() + "] and  regionLocation : " + regionLocation);
-                    }
+                List<Future<List<InputSplit>>> tasks = new ArrayList<>();
 
-                    inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
-                            regionLocation, regionSize);
-                    inputSplit.setQuery(query);
-                    psplits.add(inputSplit);
+                try {
+                    for (final List<Scan> scans : qplan.getScans()) {
+                        Future<List<InputSplit>> task = executorService.submit(
+                                new Callable<List<InputSplit>>() {
+                                    @Override public List<InputSplit> call() throws Exception {
+                                        return generateSplitsInternal(query, scans, splitByStats,
+                                                connection, regionLocator, tablePaths);
+                                    }
+                                });
+                        tasks.add(task);
+                    }
+                    for (Future<List<InputSplit>> task : tasks) {
+                        psplits.addAll(task.get());
+                    }
+                } catch (ExecutionException | InterruptedException exception) {
+                    throw new IOException("Failed to Generate Input Splits in Parallel, reason:",
+                            exception);

Review Comment:
   Good to unwrap the ExecutionException and throw back the real exception. It may already be an IOException which you can throw with a cast, rather than rewrapping in another IOException.



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -119,74 +124,144 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
     }
 
     private List<InputSplit> generateSplits(final JobConf jobConf, final QueryPlan qplan,
-                                            final List<KeyRange> splits, String query) throws
-            IOException {
-        if (qplan == null){
+                                            final List<KeyRange> splits, final String query)
+            throws IOException {
+
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshold = jobConf.getInt(
+                PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
+                PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
         setScanCacheSize(jobConf);
+        try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory
+                .createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
+            final RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(
+                            qplan.getTableRef().getTable().getPhysicalName().toString()));
+            final int scanSize = qplan.getScans().size();
+            if (useParallelInputGeneration(parallelThreshold, scanSize)) {
+                final int parallism = jobConf.getInt(
+                        PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,
+                        PhoenixStorageHandlerConstants
+                                .DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT);
+                ExecutorService executorService = Executors.newFixedThreadPool(parallism);
+                LOG.info("Generate Input Splits in Parallel with {} threads", parallism);

Review Comment:
   ```suggestion
                   LOG.info("Generating Input Splits in Parallel with {} threads", parallism);
   ```



##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {
+    private static final Logger LOG = LoggerFactory.getLogger(HivePhoenixInputFormatTest.class);
+    private static final String TABLE_NAME = "HivePhoenixInputFormatTest".toUpperCase(Locale.ROOT);
+    private static final String DDL = "CREATE TABLE " + TABLE_NAME
+            + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)";
+    private static final int SPLITS = 256;
+
+    // This test will create phoenix table with 128 splits and compare performance of
+    // serial split-generation method and parallel split-generation method.
+    @Test
+    public void testGetSplitsSerialOrParallel() throws IOException, SQLException {
+        PhoenixInputFormat<PhoenixRecordWritable> inputFormat =
+                new PhoenixInputFormat<PhoenixRecordWritable>();
+        long start;
+        long end;
+        // create table with N splits
+        System.out.println(
+                String.format("generate testing table with %s splits", String.valueOf(SPLITS)));
+        setupTestTable();
+        // setup configuration required for PhoenixInputFormat
+        Configuration conf = getUtility().getConfiguration();
+        JobConf jobConf = new JobConf(conf);
+        configureTestInput(jobConf);
+        inputFormat.getSplits(jobConf, SPLITS);
+        InputSplit[] inputSplitsSerial;
+        // test get splits in serial
+        start = System.currentTimeMillis();
+        jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, "0");
+        inputSplitsSerial = inputFormat.getSplits(jobConf, SPLITS);
+        end = System.currentTimeMillis();
+        long durationInSerial = end - start;
+        System.out.println(String.format("get split in serial requires:%s ms",
+                String.valueOf(durationInSerial)));
+
+        // test get splits in parallel
+        start = System.currentTimeMillis();
+        jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, "1");
+        jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,"24");
+        InputSplit[] inputSplitsParallel = inputFormat.getSplits(jobConf, SPLITS);
+        end = System.currentTimeMillis();
+        long durationInParallel = end - start;
+
+        System.out.println(String.format("get split in parallel requires:%s ms",
+                String.valueOf(durationInParallel)));
+
+        // Test if performance of parallel method is better than serial method
+        Assert.assertTrue(durationInParallel < durationInSerial);

Review Comment:
   This will result in flaky tests as the environments which will run this test are guaranteed to not be deterministic. Unit tests should be about functional correctness, not performance.



##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {

Review Comment:
   Do the existing Phoenix-hive tests activate your new property and implicitly validate that it is functional? I think we have some test classes but do we create multi-region Phoenix tables in those tests (or those with enough data to have multiple guideposts)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org