You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/10/30 15:59:13 UTC

[1/2] phoenix git commit: PHOENIX-4329 Test IndexScrutinyTool while table is taking writes (Vincent Poon)

Repository: phoenix
Updated Branches:
  refs/heads/master 60a9b099e -> 0c38f493c


PHOENIX-4329 Test IndexScrutinyTool while table is taking writes (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0c38f493
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0c38f493
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0c38f493

Branch: refs/heads/master
Commit: 0c38f493ca4e35eefa2297f62cbe56cca47bb81d
Parents: 438ac56
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 29 15:20:23 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Oct 30 08:59:00 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/IndexScrutinyToolIT.java    | 101 ++++++++++++++++++-
 1 file changed, 96 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/0c38f493/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index 10595a7..cbce7b2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -36,6 +36,9 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
 import org.apache.commons.io.IOUtils;
@@ -43,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
@@ -103,6 +107,7 @@ public class IndexScrutinyToolIT extends BaseTest {
     private PreparedStatement indexTableUpsertStmt;
 
     private long testTime;
+    private Properties props;
 
     @Parameterized.Parameters
     public static Collection<Object[]> data() {
@@ -120,8 +125,11 @@ public class IndexScrutinyToolIT extends BaseTest {
 
     @BeforeClass
     public static void doSetup() throws Exception {
-        Map<String, String> props = Maps.newHashMap();
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        Map<String, String> serverProps = Maps.newHashMap();
+        //disable major compactions
+        serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
+        Map<String, String> clientProps = Maps.newHashMap();
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
     /**
@@ -133,7 +141,7 @@ public class IndexScrutinyToolIT extends BaseTest {
         createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
         createTestTable(getUrl(),
             String.format(indexTableDdl, indexTableName, dataTableFullName));
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         conn = DriverManager.getConnection(getUrl(), props);
         String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
         dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
@@ -141,6 +149,7 @@ public class IndexScrutinyToolIT extends BaseTest {
         indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
         conn.setAutoCommit(false);
         testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+
     }
 
     @After
@@ -177,6 +186,77 @@ public class IndexScrutinyToolIT extends BaseTest {
     }
 
     /**
+     * Tests running a scrutiny while updates and deletes are happening.
+     * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
+     */
+    @Test
+    public void testScrutinyWhileTakingWrites() throws Exception {
+        int id = 0;
+        while (id < 1000) {
+            int index = 1;
+            dataTableUpsertStmt.setInt(index++, id);
+            dataTableUpsertStmt.setString(index++, "name-" + id);
+            dataTableUpsertStmt.setInt(index++, id);
+            dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
+            dataTableUpsertStmt.executeUpdate();
+            id++;
+        }
+        conn.commit();
+
+        //CURRENT_SCN for scrutiny
+        long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
+
+        // launch background upserts and deletes
+        final Random random = new Random(0);
+        Runnable backgroundUpserts = new Runnable() {
+            @Override
+            public void run() {
+                int idToUpsert = random.nextInt(1000);
+                try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                    PreparedStatement dataPS =
+                            conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
+                    upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
+                    conn.commit();
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        Runnable backgroundDeletes = new Runnable() {
+            @Override
+            public void run() {
+                int idToDelete = random.nextInt(1000);
+                try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                    String deleteSql =
+                            String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"="
+                                    + idToDelete;
+                    conn.createStatement().executeUpdate(deleteSql);
+                    conn.commit();
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
+        scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200,
+            TimeUnit.MILLISECONDS);
+        scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200,
+            TimeUnit.MILLISECONDS);
+
+        // scrutiny should report everything as ok
+        List<Job> completedJobs =
+                runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName,
+                    scrutinyTS);
+        Job job = completedJobs.get(0);
+        assertTrue(job.isSuccessful());
+        Counters counters = job.getCounters();
+        assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
+        assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+        scheduledThreadPool.shutdown();
+        scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
+    }
+
+    /**
      * Tests an index with the same # of rows as the data table, but one of the index rows is
      * incorrect Scrutiny should report the invalid rows.
      */
@@ -570,6 +650,13 @@ public class IndexScrutinyToolIT extends BaseTest {
     private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
             SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
             Long maxOutputRows) {
+        return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
+            outputInvalidRows, outputFormat, maxOutputRows, Long.MAX_VALUE);
+    }
+
+    private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+            SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
+            Long maxOutputRows, Long scrutinyTs) {
         final List<String> args = Lists.newArrayList();
         if (schemaName != null) {
             args.add("-s");
@@ -591,7 +678,7 @@ public class IndexScrutinyToolIT extends BaseTest {
             args.add(outputDir);
         }
         args.add("-t");
-        args.add(String.valueOf(Long.MAX_VALUE));
+        args.add(String.valueOf(scrutinyTs));
         args.add("-run-foreground");
         if (batchSize != null) {
             args.add("-b");
@@ -619,6 +706,10 @@ public class IndexScrutinyToolIT extends BaseTest {
         return args.toArray(new String[0]);
     }
 
+    private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
+        return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, false, null, null, scrutinyTS));
+    }
+
     private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception {
         return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
     }
@@ -632,7 +723,7 @@ public class IndexScrutinyToolIT extends BaseTest {
             Long batchSize, SourceTable sourceTable) throws Exception {
         final String[] cmdArgs =
                 getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, false,
-                    null, null);
+                    null, null, Long.MAX_VALUE);
         return runScrutiny(cmdArgs);
     }
 


[2/2] phoenix git commit: PHOENIX-4277 Treat delete markers consistently with puts for point-in-time scans

Posted by ja...@apache.org.
PHOENIX-4277 Treat delete markers consistently with puts for point-in-time scans


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/438ac567
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/438ac567
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/438ac567

Branch: refs/heads/master
Commit: 438ac5676e8e8f0a69875d9b91acaf5c8ac6201c
Parents: 60a9b09
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 29 15:19:23 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Oct 30 08:59:00 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/PointInTimeQueryIT.java     |  2 +-
 .../hadoop/hbase/regionserver/ScanInfoUtil.java | 35 ++++++++++++++++++++
 .../coprocessor/BaseScannerRegionObserver.java  | 21 ++++++++++++
 .../apache/phoenix/util/TransactionUtil.java    |  7 ++--
 4 files changed, 62 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/438ac567/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
index c53e523..ed3e8a9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
@@ -63,7 +63,7 @@ public class PointInTimeQueryIT extends BaseQueryIT {
     public PointInTimeQueryIT(String idxDdl, boolean columnEncoded)
             throws Exception {
         // These queries fail without KEEP_DELETED_CELLS=true
-        super(idxDdl, columnEncoded, true);
+        super(idxDdl, columnEncoded, false);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/438ac567/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
new file mode 100644
index 0000000..9885c78
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeepDeletedCells;
+
+public class ScanInfoUtil {
+    private ScanInfoUtil() {
+    }
+    
+    public static boolean isKeepDeletedCells(ScanInfo scanInfo) {
+        return scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE;
+    }
+    
+    public static ScanInfo cloneScanInfoWithKeepDeletedCells(ScanInfo scanInfo) {
+        return new ScanInfo(scanInfo.getConfiguration(), scanInfo.getFamily(), Math.max(scanInfo.getMinVersions(), 1),
+                    scanInfo.getMaxVersions(), scanInfo.getTtl(), KeepDeletedCells.TRUE,
+                    scanInfo.getTimeToPurgeDeletes(), scanInfo.getComparator());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/438ac567/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d3b257b..519e419 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -19,10 +19,12 @@ package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -30,10 +32,15 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -48,6 +55,7 @@ import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
 
 
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@@ -350,4 +358,17 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                 dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
     }
 
+    @Override
+    public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+        final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
+        final KeyValueScanner s) throws IOException {
+
+      if (scan.isRaw() || ScanInfoUtil.isKeepDeletedCells(store.getScanInfo()) || scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP || TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax())) {
+        return s;
+      }
+
+      ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo());
+      return new StoreScanner(store, scanInfo, scan, targetCols,
+          c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/438ac567/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 01b775e..a99c700 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
@@ -33,11 +31,16 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.transaction.PhoenixTransactionalTable;
 import org.apache.phoenix.transaction.TephraTransactionTable;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.tephra.util.TxUtils;
 
 public class TransactionUtil {
     private TransactionUtil() {
     }
     
+    public static boolean isTransactionalTimestamp(long ts) {
+        return !TxUtils.isPreExistingVersion(ts);
+    }
+    
     public static boolean isDelete(Cell cell) {
         return (CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY));
     }