You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/09/16 17:30:43 UTC
[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5474 IndexTool
should report the number of rows built
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
new 443e277 PHOENIX-5474 IndexTool should report the number of rows built
443e277 is described below
commit 443e2775c8010cee86fe457ecd891b9734d0ba37
Author: Kadir <ko...@salesforce.com>
AuthorDate: Thu Sep 12 14:35:19 2019 -0700
PHOENIX-5474 IndexTool should report the number of rows built
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 46 ++++++++++++++++-
.../apache/phoenix/mapreduce/index/IndexTool.java | 2 +-
.../index/PhoenixServerBuildIndexDBWritable.java | 59 ++++++++++++++++++++++
.../index/PhoenixServerBuildIndexMapper.java | 24 ++-------
4 files changed, 108 insertions(+), 23 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 185b5a0..8a842ff 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.end2end;
+import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -61,6 +62,7 @@ import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -253,6 +255,47 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
+ public void testBuildSecondaryIndexAndScrutinize() throws Exception {
+ // This test is for building non-transactional global indexes with direct api
+ if (localIndex || transactional || !directApi || useSnapshot) {
+ return;
+ }
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String stmString1 =
+ "CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ + tableDDLOptions;
+ conn.createStatement().execute(stmString1);
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+ // Insert NROWS rows
+ final int NROWS = 1000;
+ for (int i = 0; i < NROWS; i++) {
+ upsertRow(stmt1, i);
+ }
+ conn.commit();
+ String stmtString2 =
+ String.format(
+ "CREATE %s INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ",
+ (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
+ conn.createStatement().execute(stmtString2);
+
+ // Run the index MR job and verify that the index table is built correctly
+ IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ }
+ }
+
+ @Test
public void testIndexToolWithTenantId() throws Exception {
if (!useTenantId) { return;}
String tenantId = generateUniqueName();
@@ -600,7 +643,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs);
}
- public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
+ public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
String dataTableName, String indexTableName, String tenantId, int expectedStatus,
String... additionalArgs) throws Exception {
IndexTool indexingTool = new IndexTool();
@@ -617,5 +660,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
verifyMapper(indexingTool.getJob(), directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
}
assertEquals(expectedStatus, status);
+ return indexingTool;
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 8565e08..7a37e63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -534,7 +534,7 @@ public class IndexTool extends Configured implements Tool {
FileOutputFormat.setOutputPath(job, outputPath);
}
- PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class,
+ PhoenixMapReduceUtil.setInput(job, PhoenixServerBuildIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class,
qDataTable, "");
TableMapReduceUtil.initCredentials(job);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexDBWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexDBWritable.java
new file mode 100644
index 0000000..4054918
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexDBWritable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
+
+/**
+ * A {@link DBWritable} class that reads the number of rows that have been built.
+ *
+ *
+ */
+public class PhoenixServerBuildIndexDBWritable implements DBWritable {
+ private long rowCount;
+
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public void readFields(ResultSet resultSet) throws SQLException {
+ Tuple row = resultSet.unwrap(PhoenixResultSet.class).getCurrentRow();
+ Cell kv = row.getValue(0);
+ ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ // A single Cell will be returned with the count(*) - we decode that here
+ rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
index 35173bc..5987a02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
@@ -18,37 +18,19 @@
package org.apache.phoenix.mapreduce.index;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.PhoenixJobCounters;
-import org.apache.phoenix.mapreduce.util.ConnectionUtil;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
/**
* Mapper that does not do much as regions servers actually build the index from the data table regions directly
*/
public class PhoenixServerBuildIndexMapper extends
- Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, IntWritable> {
+ Mapper<NullWritable, PhoenixServerBuildIndexDBWritable, ImmutableBytesWritable, IntWritable> {
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
@@ -56,9 +38,9 @@ public class PhoenixServerBuildIndexMapper extends
}
@Override
- protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
+ protected void map(NullWritable key, PhoenixServerBuildIndexDBWritable record, Context context)
throws IOException, InterruptedException {
- context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+ context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(record.getRowCount());
// Make sure progress is reported to Application Master.
context.progress();
}