You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/09/16 17:43:28 UTC

[phoenix] branch 4.14-HBase-1.4 updated: PHOENIX-5474 IndexTool should report the number of rows built

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push:
     new 6687130  PHOENIX-5474 IndexTool should report the number of rows built
6687130 is described below

commit 6687130521ec7ffbb4d2eaeb7457f1fd2ba75d5b
Author: Kadir <ko...@salesforce.com>
AuthorDate: Thu Sep 12 14:35:19 2019 -0700

    PHOENIX-5474 IndexTool should report the number of rows built
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 46 ++++++++++++++++-
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  2 +-
 .../index/PhoenixServerBuildIndexDBWritable.java   | 59 ++++++++++++++++++++++
 .../index/PhoenixServerBuildIndexMapper.java       | 24 ++-------
 4 files changed, 108 insertions(+), 23 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 069aced..de47129 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -59,6 +60,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.IndexScrutiny;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -244,6 +246,47 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
     }
 
     @Test
+    public void testBuildSecondaryIndexAndScrutinize() throws Exception {
+        // This test is for building non-transactional global indexes with direct api
+        if (localIndex || transactional || !directApi || useSnapshot) {
+            return;
+        }
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String stmString1 =
+                    "CREATE TABLE " + dataTableFullName
+                            + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+                            + tableDDLOptions;
+            conn.createStatement().execute(stmString1);
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+            // Insert NROWS rows
+            final int NROWS = 1000;
+            for (int i = 0; i < NROWS; i++) {
+                upsertRow(stmt1, i);
+            }
+            conn.commit();
+            String stmtString2 =
+                    String.format(
+                            "CREATE %s INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ",
+                            (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
+            conn.createStatement().execute(stmtString2);
+
+            // Run the index MR job and verify that the index table is built correctly
+            IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+        }
+    }
+
+    @Test
     public void testIndexToolWithTenantId() throws Exception {
         if (!useTenantId) { return;}
         String tenantId = generateUniqueName();
@@ -587,7 +630,7 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
         runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs);
     }
 
-    public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
+    public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
             String dataTableName, String indexTableName, String tenantId, int expectedStatus,
             String... additionalArgs) throws Exception {
         IndexTool indexingTool = new IndexTool();
@@ -604,5 +647,6 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
             verifyMapper(indexingTool.getJob(), directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
         }
         assertEquals(expectedStatus, status);
+        return indexingTool;
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index c8ff3bb..dcf4d4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -512,7 +512,7 @@ public class IndexTool extends Configured implements Tool {
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
             FileOutputFormat.setOutputPath(job, outputPath);
 
-            PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class,
+            PhoenixMapReduceUtil.setInput(job, PhoenixServerBuildIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class,
                             qDataTable, "");
 
             TableMapReduceUtil.initCredentials(job);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexDBWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexDBWritable.java
new file mode 100644
index 0000000..4054918
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexDBWritable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
+
+/**
+ * A {@link DBWritable} class that reads the number of rows that have been built.
+ *
+ * 
+ */
+public class PhoenixServerBuildIndexDBWritable  implements DBWritable {
+    private long rowCount;
+
+    @Override
+    public void write(PreparedStatement statement) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void readFields(ResultSet resultSet) throws SQLException {
+        Tuple row = resultSet.unwrap(PhoenixResultSet.class).getCurrentRow();
+        Cell kv = row.getValue(0);
+        ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+        // A single Cell will be returned with the count(*) - we decode that here
+        rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+    }
+
+    public long getRowCount() {
+        return rowCount;
+    }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
index 35173bc..5987a02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
@@ -18,37 +18,19 @@
 package org.apache.phoenix.mapreduce.index;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
 import java.util.UUID;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.PhoenixJobCounters;
-import org.apache.phoenix.mapreduce.util.ConnectionUtil;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
 
 /**
  * Mapper that does not do much as regions servers actually build the index from the data table regions directly
  */
 public class PhoenixServerBuildIndexMapper extends
-        Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, IntWritable> {
+        Mapper<NullWritable, PhoenixServerBuildIndexDBWritable, ImmutableBytesWritable, IntWritable> {
 
     @Override
     protected void setup(final Context context) throws IOException, InterruptedException {
@@ -56,9 +38,9 @@ public class PhoenixServerBuildIndexMapper extends
     }
 
     @Override
-    protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
+    protected void map(NullWritable key, PhoenixServerBuildIndexDBWritable record, Context context)
             throws IOException, InterruptedException {
-        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(record.getRowCount());
         // Make sure progress is reported to Application Master.
         context.progress();
     }