You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2019/02/19 23:36:58 UTC
[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5018 Index
mutations created by UPSERT SELECT will have wrong timestamps
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
new 190828b PHOENIX-5018 Index mutations created by UPSERT SELECT will have wrong timestamps
190828b is described below
commit 190828bf66679d2768ffd1a5713023e8b6a5ec12
Author: Kadir <ko...@salesforce.com>
AuthorDate: Tue Jan 29 17:14:02 2019 -0800
PHOENIX-5018 Index mutations created by UPSERT SELECT will have wrong timestamps
Signed-off-by: Geoffrey Jacoby <gj...@apache.org>
---
.../phoenix/end2end/IndexBuildTimestampIT.java | 248 +++++++++++++++++++++
.../org/apache/phoenix/end2end/IndexToolIT.java | 36 +++
.../phoenix/end2end/TableDDLPermissionsIT.java | 8 -
.../org/apache/phoenix/rpc/PhoenixServerRpcIT.java | 6 -
.../phoenix/compile/ServerBuildIndexCompiler.java | 138 ++++++++++++
.../org/apache/phoenix/index/IndexMaintainer.java | 3 +-
.../phoenix/mapreduce/PhoenixInputFormat.java | 4 +-
.../phoenix/mapreduce/PhoenixRecordReader.java | 4 +-
.../PhoenixServerBuildIndexInputFormat.java | 111 +++++++++
.../apache/phoenix/mapreduce/index/IndexTool.java | 243 ++++++++++++--------
.../index/PhoenixServerBuildIndexMapper.java | 75 +++++++
.../mapreduce/util/PhoenixConfigurationUtil.java | 25 +++
.../mapreduce/util/PhoenixMapReduceUtil.java | 27 +++
.../org/apache/phoenix/schema/MetaDataClient.java | 16 +-
14 files changed, 820 insertions(+), 124 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
new file mode 100644
index 0000000..50be0b8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class IndexBuildTimestampIT extends BaseUniqueNamesOwnClusterIT {
+ private final boolean localIndex;
+ private final boolean async;
+ private final boolean view;
+ private final String tableDDLOptions;
+
+ public IndexBuildTimestampIT(boolean mutable, boolean localIndex,
+ boolean async, boolean view) {
+ this.localIndex = localIndex;
+ this.async = async;
+ this.view = view;
+ StringBuilder optionBuilder = new StringBuilder();
+ if (!mutable) {
+ optionBuilder.append(" IMMUTABLE_ROWS=true ");
+ }
+ optionBuilder.append(" SPLIT ON(1,2)");
+ this.tableDDLOptions = optionBuilder.toString();
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ IndexToolIT.setup();
+ }
+
+ @Parameters(
+ name = "mutable={0},localIndex={1},async={2},view={3}")
+ public static Collection<Object[]> data() {
+ List<Object[]> list = Lists.newArrayListWithExpectedSize(8);
+ boolean[] Booleans = new boolean[]{false, true};
+ for (boolean mutable : Booleans) {
+ for (boolean localIndex : Booleans) {
+ for (boolean async : Booleans) {
+ for (boolean view : Booleans) {
+ list.add(new Object[]{mutable, localIndex, async, view});
+ }
+ }
+ }
+ }
+ return list;
+ }
+
+ public static void assertExplainPlan(Connection conn, boolean localIndex, String selectSql,
+ String dataTableFullName, String indexTableFullName) throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+
+ IndexToolIT.assertExplainPlan(localIndex, actualExplainPlan, dataTableFullName, indexTableFullName);
+ }
+
+ private class MyClock extends EnvironmentEdge {
+ long initialTime;
+ long delta;
+
+ public MyClock(long delta) {
+ initialTime = System.currentTimeMillis() + delta;
+ this.delta = delta;
+ }
+
+ @Override
+ public long currentTime() {
+ return System.currentTimeMillis() + delta;
+ }
+
+ public long initialTime() {
+ return initialTime;
+ }
+ }
+
+ private void populateTable(String tableName, MyClock clock1, MyClock clock2) throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("create table " + tableName +
+ " (id varchar(10) not null primary key, val varchar(10), ts timestamp)" + tableDDLOptions);
+
+ EnvironmentEdgeManager.injectEdge(clock1);
+ conn.createStatement().execute("upsert into " + tableName + " values ('aaa', 'abc', current_date())");
+ conn.commit();
+
+ EnvironmentEdgeManager.injectEdge(clock2);
+ conn.createStatement().execute("upsert into " + tableName + " values ('bbb', 'bcd', current_date())");
+ conn.commit();
+ conn.close();
+
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN", Long.toString(clock1.initialTime()));
+ conn = DriverManager.getConnection(getUrl(), props);
+ ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName);
+ assertFalse(rs.next());
+ conn.close();
+
+ props.setProperty("CurrentSCN", Long.toString(clock2.initialTime()));
+ conn = DriverManager.getConnection(getUrl(), props);
+ rs = conn.createStatement().executeQuery("select * from " + tableName);
+
+ assertTrue(rs.next());
+ assertEquals("aaa", rs.getString(1));
+ assertEquals("abc", rs.getString(2));
+ assertNotNull(rs.getDate(3));
+
+ assertFalse(rs.next());
+ conn.close();
+
+ props.setProperty("CurrentSCN", Long.toString(clock2.currentTime()));
+ conn = DriverManager.getConnection(getUrl(), props);
+ rs = conn.createStatement().executeQuery("select * from " + tableName);
+
+ assertTrue(rs.next());
+ assertEquals("aaa", rs.getString(1));
+ assertEquals("abc", rs.getString(2));
+ assertNotNull(rs.getDate(3));
+
+ assertTrue(rs.next());
+ assertEquals("bbb", rs.getString(1));
+ assertEquals("bcd", rs.getString(2));
+ assertNotNull(rs.getDate(3));
+ assertFalse(rs.next());
+ conn.close();
+ }
+
+ @Test
+ public void testCellTimestamp() throws Exception {
+ EnvironmentEdgeManager.reset();
+ MyClock clock1 = new MyClock(100000);
+ MyClock clock2 = new MyClock(200000);
+ String dataTableName = generateUniqueName();
+ populateTable(dataTableName, clock1, clock2);
+
+ MyClock clock3 = new MyClock(300000);
+ EnvironmentEdgeManager.injectEdge(clock3);
+
+ Properties props = new Properties();
+ props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, "true");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+
+ String viewName = null;
+ if (view) {
+ viewName = generateUniqueName();
+ conn.createStatement().execute("CREATE VIEW "+ viewName + " AS SELECT * FROM " +
+ dataTableName);
+ }
+ String indexName = generateUniqueName();
+ conn.createStatement().execute("CREATE "+ (localIndex ? "LOCAL " : "") + " INDEX " + indexName + " on " +
+ (view ? viewName : dataTableName) + " (val) include (ts)" + (async ? "ASYNC" : ""));
+
+ conn.close();
+
+ if (async) {
+ // run the index MR job.
+ IndexToolIT.runIndexTool(true, false, null, (view ? viewName : dataTableName), indexName);
+ }
+
+ // Verify the index timestamps via Phoenix
+ String selectSql = String.format("SELECT * FROM %s WHERE val = 'abc'", (view ? viewName : dataTableName));
+ conn = DriverManager.getConnection(getUrl());
+ // assert we are pulling from index table
+ assertExplainPlan(conn, localIndex, selectSql, dataTableName, (view ? "_IDX_" + dataTableName : indexName));
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue (rs.next());
+ assertTrue(rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() < clock2.initialTime() &&
+ rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() >= clock1.initialTime());
+
+ selectSql =
+ String.format("SELECT * FROM %s WHERE val = 'bcd'", (view ? viewName : dataTableName));
+ // assert we are pulling from index table
+ assertExplainPlan(conn, localIndex, selectSql, dataTableName, (view ? "_IDX_" + dataTableName : indexName));
+
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue (rs.next());
+ assertTrue(rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() < clock3.initialTime() &&
+ rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() >= clock2.initialTime()
+ );
+ assertFalse (rs.next());
+
+ // Verify the index timestamps via HBase
+ PTable pIndexTable = PhoenixRuntime.getTable(conn, indexName);
+ Table table = conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(pIndexTable.getPhysicalName().getBytes());
+
+ Scan scan = new Scan();
+ scan.setTimeRange(clock3.initialTime(), clock3.currentTime());
+ ResultScanner scanner = table.getScanner(scan);
+ assertTrue(scanner.next() == null);
+
+
+ scan = new Scan();
+ scan.setTimeRange(clock2.initialTime(), clock3.initialTime());
+ scanner = table.getScanner(scan);
+ assertTrue(scanner.next() != null);
+
+
+ scan = new Scan();
+ scan.setTimeRange(clock1.initialTime(), clock2.initialTime());
+ scanner = table.getScanner(scan);
+ assertTrue(scanner.next() != null);
+ conn.close();
+ EnvironmentEdgeManager.reset();
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index c185f39..d25cf6f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -47,11 +47,17 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
+
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -498,6 +504,32 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, new String[0]);
}
+ private static void verifyMapper(Job job, boolean directApi, boolean useSnapshot, String schemaName,
+ String dataTableName, String indexTableName, String tenantId) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ if (tenantId != null) {
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ try (Connection conn =
+ DriverManager.getConnection(getUrl(), props)) {
+ PTable dataTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, dataTableName));
+ PTable indexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indexTableName));
+ boolean transactional = dataTable.isTransactional();
+ boolean localIndex = PTable.IndexType.LOCAL.equals(indexTable.getIndexType());
+
+ if (directApi) {
+ if ((localIndex || !transactional) && !useSnapshot) {
+ assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class);
+ } else {
+ assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class);
+ }
+ }
+ else {
+ assertEquals(job.getMapperClass(), PhoenixIndexImportMapper.class);
+ }
+ }
+ }
+
public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
String dataTableName, String indexTableName, String... additionalArgs) throws Exception {
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs);
@@ -515,6 +547,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
cmdArgList.addAll(Arrays.asList(additionalArgs));
int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()]));
+
+ if (expectedStatus == 0) {
+ verifyMapper(indexingTool.getJob(), directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
+ }
assertEquals(expectedStatus, status);
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
index 86a6b60..d29056d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
@@ -204,14 +204,7 @@ public class TableDDLPermissionsIT extends BasePermissionsIT {
// we should be able to read the data from another index as well to which we have not given any access to
// this user
- verifyAllowed(createIndex(indexName2, phoenixTableName), unprivilegedUser);
verifyAllowed(readTable(phoenixTableName, indexName1), unprivilegedUser);
- verifyAllowed(readTable(phoenixTableName, indexName2), unprivilegedUser);
- verifyAllowed(rebuildIndex(indexName2, phoenixTableName), unprivilegedUser);
-
- // data table user should be able to read new index
- verifyAllowed(rebuildIndex(indexName2, phoenixTableName), regularUser1);
- verifyAllowed(readTable(phoenixTableName, indexName2), regularUser1);
verifyAllowed(readTable(phoenixTableName), regularUser1);
verifyAllowed(rebuildIndex(indexName1, phoenixTableName), regularUser1);
@@ -220,7 +213,6 @@ public class TableDDLPermissionsIT extends BasePermissionsIT {
verifyAllowed(dropView(viewName1), regularUser1);
verifyAllowed(dropView(viewName2), regularUser1);
verifyAllowed(dropColumn(phoenixTableName, "val1"), regularUser1);
- verifyAllowed(dropIndex(indexName2, phoenixTableName), regularUser1);
verifyAllowed(dropIndex(indexName1, phoenixTableName), regularUser1);
verifyAllowed(dropTable(phoenixTableName), regularUser1);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index 1c18667..ab05c16 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -140,12 +140,6 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
assertEquals("k1", rs.getString(1));
assertEquals("v2", rs.getString(2));
assertFalse(rs.next());
-
- TestPhoenixIndexRpcSchedulerFactory.reset();
- createIndex(conn, indexName + "_1");
- // Verify that that index queue is not used since running upsert select on server side has been disabled
- // See PHOENIX-4171
- Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.never()).dispatch(Mockito.any(CallRunner.class));
}
finally {
conn.close();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
new file mode 100644
index 0000000..7d1c1b4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.schema.*;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.StringUtil;
+
+import com.google.common.collect.Lists;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+
+/**
+ * Class that compiles plan to generate initial data values after a DDL command for
+ * index table.
+ */
+public class ServerBuildIndexCompiler {
+ private final PhoenixConnection connection;
+ private final String tableName;
+ private PTable dataTable;
+ private QueryPlan plan;
+
+ private class RowCountMutationPlan extends BaseMutationPlan {
+ private RowCountMutationPlan(StatementContext context, PhoenixStatement.Operation operation) {
+ super(context, operation);
+ }
+ @Override
+ public MutationState execute() throws SQLException {
+ connection.getMutationState().commitDDLFence(dataTable);
+ Tuple tuple = plan.iterator().next();
+ long rowCount = 0;
+ if (tuple != null) {
+ Cell kv = tuple.getValue(0);
+ ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ // A single Cell will be returned with the count(*) - we decode that here
+ rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+ }
+ // The contract is to return a MutationState that contains the number of rows modified. In this
+ // case, it's the number of rows in the data table which corresponds to the number of index
+ // rows that were added.
+ return new MutationState(0, 0, connection, rowCount);
+ }
+
+ @Override
+ public QueryPlan getQueryPlan() {
+ return plan;
+ }
+ };
+
+ public ServerBuildIndexCompiler(PhoenixConnection connection, String tableName) {
+ this.connection = connection;
+ this.tableName = tableName;
+ }
+
+ public MutationPlan compile(PTable index) throws SQLException {
+ try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
+ String query = "SELECT count(*) FROM " + tableName;
+ this.plan = statement.compileQuery(query);
+ TableRef tableRef = plan.getTableRef();
+ Scan scan = plan.getContext().getScan();
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ dataTable = tableRef.getTable();
+ if (index.getIndexType() == PTable.IndexType.GLOBAL && dataTable.isTransactional()) {
+ throw new IllegalArgumentException(
+ "ServerBuildIndexCompiler does not support global indexes on transactional tables");
+ }
+ IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(index), plan.getContext().getConnection());
+ // Set the scan attributes that UngroupedAggregateRegionObserver will switch on.
+ // For local indexes, the BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
+ // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute is set to the serialized form of index
+ // metadata to build index rows from data table rows. For global indexes, we also need to set (1) the
+ // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to signal UngroupedAggregateRegionObserver
+ // that this scan is for building global indexes and (2) the MetaDataProtocol.PHOENIX_VERSION attribute
+ // that will be passed as a mutation attribute for the scanned mutations that will be applied on
+ // the index table possibly remotely
+ if (index.getIndexType() == PTable.IndexType.LOCAL) {
+ scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ } else {
+ scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+ ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+ }
+ // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+ // However, in this case, we need to project all of the data columns that contribute to the index.
+ IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
+ for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+ if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ scan.addFamily(columnRef.getFamily());
+ } else {
+ scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+ }
+ }
+
+ if (dataTable.isTransactional()) {
+ scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
+ }
+
+ // Go through MutationPlan abstraction so that we can create local indexes
+ // with a connectionless connection (which makes testing easier).
+ return new RowCountMutationPlan(plan.getContext(), PhoenixStatement.Operation.UPSERT);
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index d3d14d8..cb09dc4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -964,8 +964,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
// add the keyvalue for the empty row
put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts,
- // set the value to the empty column name
- dataEmptyKeyValueRef.getQualifierWritable()));
+ QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index c815119..136548e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -74,7 +74,6 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
@Override
public RecordReader<NullWritable,T> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
-
final Configuration configuration = context.getConfiguration();
final QueryPlan queryPlan = getQueryPlan(context,configuration);
@SuppressWarnings("unchecked")
@@ -164,7 +163,8 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
* @throws IOException
* @throws SQLException
*/
- private QueryPlan getQueryPlan(final JobContext context, final Configuration configuration) {
+ protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
+ throws IOException {
Preconditions.checkNotNull(context);
try {
final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index add8c31..3c4db8c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -61,8 +61,8 @@ import com.google.common.collect.Lists;
public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<NullWritable,T> {
private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
- private final Configuration configuration;
- private final QueryPlan queryPlan;
+ protected final Configuration configuration;
+ protected final QueryPlan queryPlan;
private NullWritable key = NullWritable.get();
private T value = null;
private Class<T> inputClass;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
new file mode 100644
index 0000000..f8ec393
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.compile.*;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.*;
+import org.apache.phoenix.util.*;
+
+import com.google.common.base.Preconditions;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+
+/**
+ * {@link InputFormat} implementation from Phoenix for building index
+ *
+ */
+public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends PhoenixInputFormat {
+ QueryPlan queryPlan = null;
+
+ private static final Log LOG = LogFactory.getLog(PhoenixServerBuildIndexInputFormat.class);
+
+ /**
+ * instantiated by framework
+ */
+ public PhoenixServerBuildIndexInputFormat() {
+ }
+
+ @Override
+ protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
+ throws IOException {
+ Preconditions.checkNotNull(context);
+ if (queryPlan != null) {
+ return queryPlan;
+ }
+ final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+ final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ final String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+ final Properties overridingProps = new Properties();
+ if(txnScnValue==null && currentScnValue!=null) {
+ overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
+ overridingProps.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, currentScnValue);
+ }
+ if (tenantId != null && configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
+ overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ String dataTableFullName = getIndexToolDataTableName(configuration);
+ String indexTableFullName = getIndexToolIndexTableName(configuration);
+
+ try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
+ PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
+ Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
+ PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
+ ServerBuildIndexCompiler compiler =
+ new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
+ MutationPlan plan = compiler.compile(indexTable);
+ Scan scan = plan.getContext().getScan();
+
+ try {
+ scan.setTimeRange(0, scn);
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+ queryPlan = plan.getQueryPlan();
+ // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver
+ if (txnScnValue != null) {
+ scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
+ }
+
+ // Initialize the query plan so it sets up the parallel scans
+ queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+ return queryPlan;
+ } catch (Exception exception) {
+ LOG.error(String.format("Failed to get the query plan with error [%s]",
+ exception.getMessage()));
+ throw new RuntimeException(exception);
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index dc361c9..d1d6ca2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -28,7 +28,10 @@ import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -74,6 +77,7 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames;
import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
@@ -106,6 +110,22 @@ public class IndexTool extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class);
+ private String schemaName;
+ private String dataTable;
+ private String indexTable;
+ private boolean isPartialBuild;
+ private String qDataTable;
+ private String qIndexTable;
+ private boolean useDirectApi;
+ private boolean useSnapshot;
+ private boolean isLocalIndexBuild;
+ private PTable pIndexTable;
+ private PTable pDataTable;
+ private String tenantId;
+ private Job job;
+
+
+
private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true,
"Phoenix schema name (optional)");
private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true,
@@ -247,18 +267,31 @@ public class IndexTool extends Configured implements Tool {
}
- public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild,
- boolean useSnapshot, String tenantId) throws Exception {
+ public Job getJob() throws Exception {
if (isPartialBuild) {
- return configureJobForPartialBuild(schemaName, dataTable, tenantId);
+ return configureJobForPartialBuild();
} else {
- return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot, tenantId);
+ long maxTimeRange = pIndexTable.getTimeStamp() + 1;
+ // this is set to ensure index tables remains consistent post population.
+
+ if (pDataTable.isTransactional()) {
+ configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
+ Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
+ configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name());
+ }
+ configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
+ Long.toString(maxTimeRange));
+ if (useSnapshot || !useDirectApi || (!isLocalIndexBuild && pDataTable.isTransactional())) {
+ return configureJobForAysncIndex();
+ }
+ else {
+ //Local and non-transactional global indexes to be built on the server side
+ return configureJobForServerBuildIndex();
+ }
}
}
-
- private Job configureJobForPartialBuild(String schemaName, String dataTable, String tenantId) throws Exception {
- final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
- final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
+
+ private Job configureJobForPartialBuild() throws Exception {
connection = ConnectionUtil.getInputConnection(configuration);
long minDisableTimestamp = HConstants.LATEST_TIMESTAMP;
PTable indexWithMinDisableTimestamp = null;
@@ -266,7 +299,7 @@ public class IndexTool extends Configured implements Tool {
//Get Indexes in building state, minDisabledTimestamp
List<String> disableIndexes = new ArrayList<String>();
List<PTable> disabledPIndexes = new ArrayList<PTable>();
- for (PTable index : pdataTable.getIndexes()) {
+ for (PTable index : pDataTable.getIndexes()) {
if (index.getIndexState().equals(PIndexState.BUILDING)) {
disableIndexes.add(index.getTableName().getString());
disabledPIndexes.add(index);
@@ -299,10 +332,10 @@ public class IndexTool extends Configured implements Tool {
//serialize index maintaienr in job conf with Base64 TODO: Need to find better way to serialize them in conf.
List<IndexMaintainer> maintainers = Lists.newArrayListWithExpectedSize(disabledPIndexes.size());
for (PTable index : disabledPIndexes) {
- maintainers.add(index.getIndexMaintainer(pdataTable, connection.unwrap(PhoenixConnection.class)));
+ maintainers.add(index.getIndexMaintainer(pDataTable, connection.unwrap(PhoenixConnection.class)));
}
ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
- IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
+ IndexMaintainer.serializeAdditional(pDataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr);
if (tenantId != null) {
PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
@@ -313,15 +346,15 @@ public class IndexTool extends Configured implements Tool {
scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
scan.setRaw(true);
scan.setCacheBlocks(false);
- if (pdataTable.isTransactional()) {
- long maxTimeRange = pdataTable.getTimeStamp() + 1;
+ if (pDataTable.isTransactional()) {
+ long maxTimeRange = pDataTable.getTimeStamp() + 1;
scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)))));
}
- String physicalTableName=pdataTable.getPhysicalName().getString();
- final String jobName = String.format("Phoenix Indexes build for " + pdataTable.getName().toString());
+ String physicalTableName=pDataTable.getPhysicalName().getString();
+ final String jobName = String.format("Phoenix Indexes build for " + pDataTable.getName().toString());
PhoenixConfigurationUtil.setInputTableName(configuration, qDataTable);
PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalTableName);
@@ -338,7 +371,7 @@ public class IndexTool extends Configured implements Tool {
null, job);
TableMapReduceUtil.initCredentials(job);
TableInputFormat.configureSplitTable(job, TableName.valueOf(physicalTableName));
- return configureSubmittableJobUsingDirectApi(job, true);
+ return configureSubmittableJobUsingDirectApi(job);
}
private long getMaxRebuildAsyncDate(String schemaName, List<String> disableIndexes) throws SQLException {
@@ -368,40 +401,15 @@ public class IndexTool extends Configured implements Tool {
}
- private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot, String tenantId)
- throws Exception {
- final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
- final String qIndexTable;
- if (schemaName != null && !schemaName.isEmpty()) {
- qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
- } else {
- qIndexTable = indexTable;
- }
- final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
-
- final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
-
- long maxTimeRange = pindexTable.getTimeStamp() + 1;
- // this is set to ensure index tables remains consistent post population.
+ private Job configureJobForAysncIndex()
- if (pdataTable.isTransactional()) {
- configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
- Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
- configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pdataTable.getTransactionProvider().name());
- }
- configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
- Long.toString(maxTimeRange));
-
- // check if the index type is LOCAL, if so, derive and set the physicalIndexName that is
- // computed from the qDataTable name.
- String physicalIndexTable = pindexTable.getPhysicalName().getString();
-
+ throws Exception {
+ String physicalIndexTable = pIndexTable.getPhysicalName().getString();
final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
final PostIndexDDLCompiler ddlCompiler =
- new PostIndexDDLCompiler(pConnection, new TableRef(pdataTable));
- ddlCompiler.compile(pindexTable);
-
+ new PostIndexDDLCompiler(pConnection, new TableRef(pDataTable));
+ ddlCompiler.compile(pIndexTable);
final List<String> indexColumns = ddlCompiler.getIndexColumnNames();
final String selectQuery = ddlCompiler.getSelectQuery();
final String upsertQuery =
@@ -410,6 +418,7 @@ public class IndexTool extends Configured implements Tool {
configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable);
+
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
indexColumns.toArray(new String[indexColumns.size()]));
if (tenantId != null) {
@@ -419,25 +428,22 @@ public class IndexTool extends Configured implements Tool {
PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
+ fs = outputPath.getFileSystem(configuration);
+ fs.delete(outputPath, true);
final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
final Job job = Job.getInstance(configuration, jobName);
job.setJarByClass(IndexTool.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- if (outputPath != null) {
- fs = outputPath.getFileSystem(configuration);
- fs.delete(outputPath, true);
- FileOutputFormat.setOutputPath(job, outputPath);
- }
+ FileOutputFormat.setOutputPath(job, outputPath);
if (!useSnapshot) {
- PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable,
- selectQuery);
+ PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, selectQuery);
} else {
HBaseAdmin admin = null;
String snapshotName;
try {
admin = pConnection.getQueryServices().getAdmin();
- String pdataTableName = pdataTable.getName().getString();
+ String pdataTableName = pDataTable.getName().getString();
snapshotName = new StringBuilder(pdataTableName).append("-Snapshot").toString();
admin.snapshot(snapshotName, TableName.valueOf(pdataTableName));
} finally {
@@ -452,17 +458,47 @@ public class IndexTool extends Configured implements Tool {
// set input for map reduce job using hbase snapshots
PhoenixMapReduceUtil
- .setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable, restoreDir, selectQuery);
+ .setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable, restoreDir, selectQuery);
}
TableMapReduceUtil.initCredentials(job);
if (useDirectApi) {
- return configureSubmittableJobUsingDirectApi(job, false);
+ job.setMapperClass(PhoenixIndexImportDirectMapper.class);
+ return configureSubmittableJobUsingDirectApi(job);
} else {
return configureRunnableJobUsingBulkLoad(job, outputPath);
-
}
-
+ }
+
+ private Job configureJobForServerBuildIndex()
+ throws Exception {
+
+ PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
+ PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
+
+ String physicalIndexTable = pIndexTable.getPhysicalName().getString();
+
+ PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
+ PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable);
+ if (tenantId != null) {
+ PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+ }
+
+ fs = outputPath.getFileSystem(configuration);
+ fs.delete(outputPath, true);
+
+ final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
+ final Job job = Job.getInstance(configuration, jobName);
+ job.setJarByClass(IndexTool.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class,
+ qDataTable, "");
+
+ TableMapReduceUtil.initCredentials(job);
+ job.setMapperClass(PhoenixServerBuildIndexMapper.class);
+ return configureSubmittableJobUsingDirectApi(job);
}
/**
@@ -496,12 +532,9 @@ public class IndexTool extends Configured implements Tool {
* @return
* @throws Exception
*/
- private Job configureSubmittableJobUsingDirectApi(Job job, boolean isPartialRebuild)
+ private Job configureSubmittableJobUsingDirectApi(Job job)
throws Exception {
- if (!isPartialRebuild) {
- //Don't configure mapper for partial build as it is configured already
- job.setMapperClass(PhoenixIndexImportDirectMapper.class);
- }
+
job.setReducerClass(PhoenixIndexImportDirectReducer.class);
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
@@ -520,6 +553,10 @@ public class IndexTool extends Configured implements Tool {
}
+ public Job getJob() {
+ return job;
+ }
+
@Override
public int run(String[] args) throws Exception {
Connection connection = null;
@@ -532,64 +569,75 @@ public class IndexTool extends Configured implements Tool {
printHelpAndExit(e.getMessage(), getOptions());
}
final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
- final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
- final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
- final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
- final boolean isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
- final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
- boolean useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
- String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
- boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
- boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
- byte[][] splitKeysBeforeJob = null;
- boolean isLocalIndexBuild = false;
- PTable pindexTable = null;
- String tenantId = null;
+ tenantId = null;
if (useTenantId) {
tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
connection = ConnectionUtil.getInputConnection(configuration);
+ schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+ dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+ indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+ isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
+ qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+ pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
+ useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
+ String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+ boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+ useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
+
+ byte[][] splitKeysBeforeJob = null;
+ isLocalIndexBuild = false;
+ pIndexTable = null;
+
+ connection = ConnectionUtil.getInputConnection(configuration);
+
if (indexTable != null) {
if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
throw new IllegalArgumentException(String.format(
" %s is not an index table for %s for this connection", indexTable, qDataTable));
}
- pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
+ pIndexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
+
+ if (schemaName != null && !schemaName.isEmpty()) {
+ qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
+ } else {
+ qIndexTable = indexTable;
+ }
htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices()
- .getTable(pindexTable.getPhysicalName().getBytes());
- if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
+ .getTable(pIndexTable.getPhysicalName().getBytes());
+
+ if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) {
isLocalIndexBuild = true;
splitKeysBeforeJob = htable.getRegionLocator().getStartKeys();
}
// presplit the index table
boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt());
- boolean isSalted = pindexTable.getBucketNum() != null; // no need to split salted tables
- if (!isSalted && IndexType.GLOBAL.equals(pindexTable.getIndexType())
+ boolean isSalted = pIndexTable.getBucketNum() != null; // no need to split salted tables
+ if (!isSalted && IndexType.GLOBAL.equals(pIndexTable.getIndexType())
&& (autosplit || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()))) {
String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt());
int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
String rateOpt = cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt());
double samplingRate = rateOpt == null ? DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt);
LOG.info(String.format("Will split index %s , autosplit=%s , autoSplitNumRegions=%s , samplingRate=%s", indexTable, autosplit, autosplitNumRegions, samplingRate));
- splitIndexTable(connection.unwrap(PhoenixConnection.class), qDataTable, pindexTable, autosplit, autosplitNumRegions, samplingRate);
+
+ splitIndexTable(connection.unwrap(PhoenixConnection.class), autosplit, autosplitNumRegions, samplingRate, configuration);
}
}
-
- PTable pdataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
Path outputPath = null;
FileSystem fs = null;
if (basePath != null) {
- outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pindexTable == null
- ? pdataTable.getPhysicalName().getString() : pindexTable.getPhysicalName().getString());
+ outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pIndexTable == null
+ ? pDataTable.getPhysicalName().getString() : pIndexTable.getPhysicalName().getString());
fs = outputPath.getFileSystem(configuration);
fs.delete(outputPath, true);
}
-
- Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable,
- useDirectApi, isPartialBuild, useSnapshot, tenantId);
+
+ job = new JobFactory(connection, configuration, outputPath).getJob();
+
if (!isForeground && useDirectApi) {
LOG.info("Running Index Build in Background - Submit async and exit");
job.submit();
@@ -635,32 +683,29 @@ public class IndexTool extends Configured implements Tool {
}
}
-
- private void splitIndexTable(PhoenixConnection pConnection, String qDataTable,
- PTable pindexTable, boolean autosplit, int autosplitNumRegions, double samplingRate)
+ private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit, int autosplitNumRegions, double samplingRate, Configuration configuration)
throws SQLException, IOException, IllegalArgumentException, InterruptedException {
- final PTable pdataTable = PhoenixRuntime.getTable(pConnection, qDataTable);
int numRegions;
try (HTable hDataTable =
(HTable) pConnection.getQueryServices()
- .getTable(pdataTable.getPhysicalName().getBytes())) {
+ .getTable(pDataTable.getPhysicalName().getBytes())) {
numRegions = hDataTable.getRegionLocator().getStartKeys().length;
if (autosplit && !(numRegions > autosplitNumRegions)) {
LOG.info(String.format(
"Will not split index %s because the data table only has %s regions, autoSplitNumRegions=%s",
- pindexTable.getPhysicalName(), numRegions, autosplitNumRegions));
+ pIndexTable.getPhysicalName(), numRegions, autosplitNumRegions));
return; // do nothing if # of regions is too low
}
}
// build a tablesample query to fetch index column values from the data table
- DataSourceColNames colNames = new DataSourceColNames(pdataTable, pindexTable);
+ DataSourceColNames colNames = new DataSourceColNames(pDataTable, pIndexTable);
String qTableSample = String.format(qDataTable + " TABLESAMPLE(%.2f)", samplingRate);
List<String> dataColNames = colNames.getDataColNames();
final String dataSampleQuery =
QueryUtil.constructSelectStatement(qTableSample, dataColNames, null,
Hint.NO_INDEX, true);
- IndexMaintainer maintainer = IndexMaintainer.create(pdataTable, pindexTable, pConnection);
+ IndexMaintainer maintainer = IndexMaintainer.create(pDataTable, pIndexTable, pConnection);
ImmutableBytesWritable dataRowKeyPtr = new ImmutableBytesWritable();
try (final PhoenixResultSet rs =
pConnection.createStatement().executeQuery(dataSampleQuery)
@@ -684,7 +729,7 @@ public class IndexTool extends Configured implements Tool {
splitPoints[splitIdx++] = b.getRightBoundExclusive();
}
// drop table and recreate with appropriate splits
- TableName indexTN = TableName.valueOf(pindexTable.getPhysicalName().getBytes());
+ TableName indexTN = TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
HTableDescriptor descriptor = admin.getTableDescriptor(indexTN);
admin.disableTable(indexTN);
admin.deleteTable(indexTN);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
new file mode 100644
index 0000000..34bcc9b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper that does not do much as regions servers actually build the index from the data table regions directly
+ */
+public class PhoenixServerBuildIndexMapper extends
+ Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, IntWritable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PhoenixServerBuildIndexMapper.class);
+
+ @Override
+ protected void setup(final Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ }
+
+ @Override
+ protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
+ throws IOException, InterruptedException {
+ context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+ // Make sure progress is reported to Application Master.
+ context.progress();
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), new IntWritable(0));
+ super.cleanup(context);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 8fa21fe..b41611f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -122,6 +122,10 @@ public final class PhoenixConfigurationUtil {
public static final String SCRUTINY_INDEX_TABLE_NAME = "phoenix.mr.scrutiny.index.table.name";
+ public static final String INDEX_TOOL_DATA_TABLE_NAME = "phoenix.mr.index_tool.data.table.name";
+
+ public static final String INDEX_TOOL_INDEX_TABLE_NAME = "phoenix.mr.index_tool.index.table.name";
+
public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table";
public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size";
@@ -569,6 +573,16 @@ public final class PhoenixConfigurationUtil {
Preconditions.checkNotNull(configuration);
return configuration.get(SCRUTINY_INDEX_TABLE_NAME);
}
+ public static void setIndexToolDataTableName(Configuration configuration, String qDataTableName) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(qDataTableName);
+ configuration.set(INDEX_TOOL_DATA_TABLE_NAME, qDataTableName);
+ }
+
+ public static String getIndexToolDataTableName(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(INDEX_TOOL_DATA_TABLE_NAME);
+ }
public static void setScrutinyIndexTable(Configuration configuration, String qIndexTableName) {
Preconditions.checkNotNull(configuration);
@@ -581,6 +595,17 @@ public final class PhoenixConfigurationUtil {
return SourceTable.valueOf(configuration.get(SCRUTINY_SOURCE_TABLE));
}
+ public static void setIndexToolIndexTableName(Configuration configuration, String qIndexTableName) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(qIndexTableName);
+ configuration.set(INDEX_TOOL_INDEX_TABLE_NAME, qIndexTableName);
+ }
+
+ public static String getIndexToolIndexTableName(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(INDEX_TOOL_INDEX_TABLE_NAME);
+ }
+
public static void setScrutinySourceTable(Configuration configuration,
SourceTable sourceTable) {
Preconditions.checkNotNull(configuration);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index ecede80..f8625da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.mapreduce.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
@@ -70,6 +71,23 @@ public final class PhoenixMapReduceUtil {
/**
*
* @param job
+ * @param inputClass DBWritable class
+ * @param inputFormatClass InputFormat class
+ * @param tableName Input table name
+ * @param inputQuery Select query
+ */
+
+ public static void setInput(final Job job, final Class<? extends DBWritable> inputClass,
+ final Class<? extends InputFormat> inputFormatClass,
+ final String tableName, final String inputQuery) {
+ final Configuration configuration = setInput(job, inputClass, inputFormatClass, tableName);
+ PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
+ PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+ }
+
+ /**
+ *
+ * @param job
* @param inputClass DBWritable class
* @param snapshotName The name of a snapshot (of a table) to read from
* @param tableName Input table name
@@ -140,6 +158,15 @@ public final class PhoenixMapReduceUtil {
return configuration;
}
+ private static Configuration setInput(final Job job, final Class<? extends DBWritable> inputClass,
+ final Class<? extends InputFormat> inputFormatClass, final String tableName){
+ job.setInputFormatClass(inputFormatClass);
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+ return configuration;
+ }
+
/**
* A method to override which HBase cluster for {@link PhoenixInputFormat} to read from
* @param job MapReduce Job
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 7cf2e21..6c5d939 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -156,6 +156,7 @@ import org.apache.phoenix.compile.PostDDLCompiler;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
import org.apache.phoenix.compile.PostLocalIndexDDLCompiler;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.ServerBuildIndexCompiler;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.StatementNormalizer;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -1390,16 +1391,17 @@ public class MetaDataClient {
}
private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException {
- MutationPlan mutationPlan;
if (index.getIndexType() == IndexType.LOCAL) {
PostLocalIndexDDLCompiler compiler =
new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef));
- mutationPlan = compiler.compile(index);
- } else {
+ return compiler.compile(index);
+ } else if (dataTableRef.getTable().isTransactional()){
PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
- mutationPlan = compiler.compile(index);
+ return compiler.compile(index);
+ } else {
+ ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef));
+ return compiler.compile(index);
}
- return mutationPlan;
}
private MutationState buildIndex(PTable index, TableRef dataTableRef) throws SQLException {
@@ -1741,6 +1743,10 @@ public class MetaDataClient {
if (connection.getSCN() != null) {
return buildIndexAtTimeStamp(table, statement.getTable());
}
+
+ String dataTableFullName = SchemaUtil.getTableName(
+ tableRef.getTable().getSchemaName().getString(),
+ tableRef.getTable().getTableName().getString());
return buildIndex(table, tableRef);
}