You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/06/20 19:51:47 UTC
[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5313: All mappers
grab all RegionLocations from .META
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
new edee198 PHOENIX-5313: All mappers grab all RegionLocations from .META
edee198 is described below
commit edee198824974a55e5dd31827a7749f8a98d937c
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Wed Jun 19 13:40:27 2019 -0700
PHOENIX-5313: All mappers grab all RegionLocations from .META
---
.../org/apache/phoenix/end2end/MapReduceIT.java | 50 ++++++++++++++------
.../iterate/MapReduceParallelScanGrouper.java | 4 +-
.../phoenix/mapreduce/PhoenixInputFormat.java | 41 ++++++++++++----
.../phoenix/mapreduce/PhoenixRecordReader.java | 12 +++--
.../mapreduce/util/PhoenixMapReduceUtil.java | 20 ++++++++
.../TestingMapReduceParallelScanGrouper.java | 54 ++++++++++++++++++++++
.../mapreduce/PhoenixTestingInputFormat.java | 46 ++++++++++++++++++
7 files changed, 201 insertions(+), 26 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
index fb24bb2..2460cd2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -25,20 +25,30 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PhoenixArray;
import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.sql.*;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.Properties;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
/**
* Test that our MapReduce basic tools work as expected
@@ -48,28 +58,37 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
private static final String STOCK_NAME = "STOCK_NAME";
private static final String RECORDING_YEAR = "RECORDING_YEAR";
private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
- private String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
+
+ // We pre-split the table to ensure that we have multiple mappers.
+ // This is used to test scenarios with more than 1 mapper
+ private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
" STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR INTEGER NOT NULL, RECORDINGS_QUARTER " +
- " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR ))";
+ " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR )) "
+ + "SPLIT ON ('AA')";
private static final String CREATE_STOCK_VIEW = "CREATE VIEW IF NOT EXISTS %s (v1 VARCHAR) AS "
+ " SELECT * FROM %s WHERE RECORDING_YEAR = 2008";
private static final String MAX_RECORDING = "MAX_RECORDING";
- private String CREATE_STOCK_STATS_TABLE =
+ private static final String CREATE_STOCK_STATS_TABLE =
"CREATE TABLE IF NOT EXISTS %s(STOCK_NAME VARCHAR NOT NULL , "
+ " MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME ))";
- private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+ private static final String UPSERT = "UPSERT into %s values (?, ?, ?)";
- private String TENANT_ID = "1234567890";
+ private static final String TENANT_ID = "1234567890";
@Before
public void setupTables() throws Exception {
}
+ @After
+ public void clearCountersForScanGrouper() {
+ TestingMapReduceParallelScanGrouper.clearNumCallsToGetRegionBoundaries();
+ }
+
@Test
public void testNoConditionsOnSelect() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
@@ -93,7 +112,8 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
}
- private void createAndTestJob(Connection conn, String s, double v, String tenantId) throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+ private void createAndTestJob(Connection conn, String s, double v, String tenantId) throws
+ SQLException, IOException, InterruptedException, ClassNotFoundException {
String stockTableName = generateUniqueName();
String stockStatsTableName = generateUniqueName();
conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
@@ -103,10 +123,9 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
Job job = Job.getInstance(conf);
if (tenantId != null) {
setInputForTenant(job, tenantId, stockTableName, s);
-
} else {
- PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockTableName, s,
- STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+ PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class,
+ stockTableName, s, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
}
testJob(conn, job, stockTableName, stockStatsTableName, v);
@@ -120,13 +139,15 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
String stockViewName = generateUniqueName();
tenantConn.createStatement().execute(String.format(CREATE_STOCK_VIEW, stockViewName, stockTableName));
tenantConn.commit();
- PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockViewName, s,
- STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+ PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class,
+ stockViewName, s, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
}
}
private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName, double expectedMax)
throws SQLException, InterruptedException, IOException, ClassNotFoundException {
+ assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0,
+ TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
upsertData(conn, stockTableName);
// only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints
@@ -154,6 +175,9 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
assertEquals("Got the wrong stock name!", "AAPL", name);
assertEquals("Max value didn't match the expected!", expectedMax, max, 0);
assertFalse("Should only have stored one row in stats table!", stats.next());
+ assertEquals("There should have been only be 1 call to getRegionBoundaries "
+ + "(corresponding to the driver code)", 1,
+ TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
}
/**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
index b4f81ae..1a1817f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
@@ -50,7 +51,8 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
return INSTANCE;
}
- private MapReduceParallelScanGrouper() {}
+ @VisibleForTesting
+ MapReduceParallelScanGrouper() {}
@Override
public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 4153e1a..1272ecd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -78,20 +79,20 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
final QueryPlan queryPlan = getQueryPlan(context,configuration);
@SuppressWarnings("unchecked")
final Class<T> inputClass = (Class<T>) PhoenixConfigurationUtil.getInputClass(configuration);
- return new PhoenixRecordReader<T>(inputClass , configuration, queryPlan);
+ return getPhoenixRecordReader(inputClass, configuration, queryPlan);
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
final Configuration configuration = context.getConfiguration();
final QueryPlan queryPlan = getQueryPlan(context,configuration);
- final List<KeyRange> allSplits = queryPlan.getSplits();
- final List<InputSplit> splits = generateSplits(queryPlan, allSplits, configuration);
- return splits;
+ return generateSplits(queryPlan, configuration);
}
- private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits, Configuration config) throws IOException {
- Preconditions.checkNotNull(qplan);
+ private List<InputSplit> generateSplits(final QueryPlan qplan, Configuration config) throws IOException {
+ // We must call this in order to initialize the scans and splits from the query plan
+ setupParallelScansFromQueryPlan(qplan);
+ final List<KeyRange> splits = qplan.getSplits();
Preconditions.checkNotNull(splits);
// Get the RegionSizeCalculator
@@ -214,8 +215,6 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
getQueryServices().getConfiguration(), snapshotName);
}
- // Initialize the query plan so it sets up the parallel scans
- queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
return queryPlan;
}
} catch (Exception exception) {
@@ -225,4 +224,30 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
}
}
+ void setupParallelScansFromQueryPlan(QueryPlan queryPlan) {
+ setupParallelScansWithScanGrouper(queryPlan, MapReduceParallelScanGrouper.getInstance());
+ }
+
+ RecordReader<NullWritable,T> getPhoenixRecordReader(Class<T> inputClass,
+ Configuration configuration, QueryPlan queryPlan) {
+ return new PhoenixRecordReader<>(inputClass , configuration, queryPlan,
+ MapReduceParallelScanGrouper.getInstance());
+ }
+
+ /**
+ * Initialize the query plan so it sets up the parallel scans
+ * @param queryPlan Query plan corresponding to the select query
+ * @param scanGrouper Parallel scan grouper
+ */
+ void setupParallelScansWithScanGrouper(QueryPlan queryPlan, ParallelScanGrouper scanGrouper) {
+ Preconditions.checkNotNull(queryPlan);
+ try {
+ queryPlan.iterator(scanGrouper);
+ } catch (SQLException e) {
+ LOGGER.error(String.format("Setting up parallel scans for the query plan failed "
+ + "with error [%s]", e.getMessage()));
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index b3493cd..fffb165 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -36,7 +36,7 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
-import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -63,18 +63,22 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRecordReader.class);
protected final Configuration configuration;
protected final QueryPlan queryPlan;
+ private final ParallelScanGrouper scanGrouper;
private NullWritable key = NullWritable.get();
private T value = null;
private Class<T> inputClass;
private ResultIterator resultIterator = null;
private PhoenixResultSet resultSet;
-
- public PhoenixRecordReader(Class<T> inputClass,final Configuration configuration,final QueryPlan queryPlan) {
+
+ PhoenixRecordReader(Class<T> inputClass, final Configuration configuration,
+ final QueryPlan queryPlan, final ParallelScanGrouper scanGrouper) {
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(queryPlan);
+ Preconditions.checkNotNull(scanGrouper);
this.inputClass = inputClass;
this.configuration = configuration;
this.queryPlan = queryPlan;
+ this.scanGrouper = scanGrouper;
}
@Override
@@ -141,7 +145,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
new TableResultIterator(
queryPlan.getContext().getConnection().getMutationState(), scan,
scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
- MapReduceParallelScanGrouper.getInstance());
+ this.scanGrouper);
peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
LOGGER.info("Adding TableResultIterator for scan: " + scan);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index f8625da..6c23fd9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -57,6 +57,26 @@ public final class PhoenixMapReduceUtil {
/**
*
+ * @param job MR job instance
+ * @param inputClass DBWritable class
+ * @param inputFormatClass InputFormat class
+ * @param tableName Input table name
+ * @param conditions Condition clause to be added to the WHERE clause.
+ * Can be <tt>null</tt> if there are no conditions.
+ * @param fieldNames fields being projected for the SELECT query.
+ */
+ public static void setInput(final Job job, final Class<? extends DBWritable> inputClass,
+ final Class<? extends InputFormat> inputFormatClass, final String tableName,
+ final String conditions, final String... fieldNames) {
+ final Configuration configuration = setInput(job, inputClass, inputFormatClass, tableName);
+ if(conditions != null) {
+ PhoenixConfigurationUtil.setInputTableConditions(configuration, conditions);
+ }
+ PhoenixConfigurationUtil.setSelectColumnNames(configuration, fieldNames);
+ }
+
+ /**
+ *
* @param job
* @param inputClass DBWritable class
* @param tableName Input table name
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java
new file mode 100644
index 0000000..77b3a7e
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.phoenix.compile.StatementContext;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * ParallelScanGrouper implementation used for testing Phoenix-MapReduce Integration
+ */
+public class TestingMapReduceParallelScanGrouper extends MapReduceParallelScanGrouper {
+
+ private static final AtomicInteger numCallsToGetRegionBoundaries = new AtomicInteger(0);
+ private static final TestingMapReduceParallelScanGrouper INSTANCE =
+ new TestingMapReduceParallelScanGrouper();
+
+ public static TestingMapReduceParallelScanGrouper getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public List<HRegionLocation> getRegionBoundaries(StatementContext context,
+ byte[] tableName) throws SQLException {
+ List<HRegionLocation> regionLocations = super.getRegionBoundaries(context, tableName);
+ numCallsToGetRegionBoundaries.incrementAndGet();
+ return regionLocations;
+ }
+
+ public static int getNumCallsToGetRegionBoundaries() {
+ return numCallsToGetRegionBoundaries.get();
+ }
+
+ public static void clearNumCallsToGetRegionBoundaries() {
+ numCallsToGetRegionBoundaries.set(0);
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTestingInputFormat.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTestingInputFormat.java
new file mode 100644
index 0000000..c0b4bea
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTestingInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper;
+
+/**
+ * InputFormat implementation used for testing Phoenix-MapReduce Integration
+ */
+public class PhoenixTestingInputFormat<T extends DBWritable> extends PhoenixInputFormat<T> {
+
+ @Override
+ void setupParallelScansFromQueryPlan(QueryPlan queryPlan) {
+ setupParallelScansWithScanGrouper(queryPlan,
+ TestingMapReduceParallelScanGrouper.getInstance());
+ }
+
+ @Override
+ RecordReader<NullWritable,T> getPhoenixRecordReader(Class<T> inputClass,
+ Configuration configuration, QueryPlan queryPlan) {
+ return new PhoenixRecordReader<>(inputClass , configuration, queryPlan,
+ TestingMapReduceParallelScanGrouper.getInstance());
+ }
+
+}