You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/10/30 15:59:13 UTC
[1/2] phoenix git commit: PHOENIX-4329 Test IndexScrutinyTool while
table is taking writes (Vincent Poon)
Repository: phoenix
Updated Branches:
refs/heads/master 60a9b099e -> 0c38f493c
PHOENIX-4329 Test IndexScrutinyTool while table is taking writes (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0c38f493
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0c38f493
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0c38f493
Branch: refs/heads/master
Commit: 0c38f493ca4e35eefa2297f62cbe56cca47bb81d
Parents: 438ac56
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 29 15:20:23 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Oct 30 08:59:00 2017 -0700
----------------------------------------------------------------------
.../phoenix/end2end/IndexScrutinyToolIT.java | 101 ++++++++++++++++++-
1 file changed, 96 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0c38f493/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index 10595a7..cbce7b2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -36,6 +36,9 @@ import java.util.Properties;
import java.util.Random;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
@@ -43,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
@@ -103,6 +107,7 @@ public class IndexScrutinyToolIT extends BaseTest {
private PreparedStatement indexTableUpsertStmt;
private long testTime;
+ private Properties props;
@Parameterized.Parameters
public static Collection<Object[]> data() {
@@ -120,8 +125,11 @@ public class IndexScrutinyToolIT extends BaseTest {
@BeforeClass
public static void doSetup() throws Exception {
- Map<String, String> props = Maps.newHashMap();
- setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ Map<String, String> serverProps = Maps.newHashMap();
+ //disable major compactions
+ serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
+ Map<String, String> clientProps = Maps.newHashMap();
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
/**
@@ -133,7 +141,7 @@ public class IndexScrutinyToolIT extends BaseTest {
createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
createTestTable(getUrl(),
String.format(indexTableDdl, indexTableName, dataTableFullName));
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
conn = DriverManager.getConnection(getUrl(), props);
String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
@@ -141,6 +149,7 @@ public class IndexScrutinyToolIT extends BaseTest {
indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
conn.setAutoCommit(false);
testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+
}
@After
@@ -177,6 +186,77 @@ public class IndexScrutinyToolIT extends BaseTest {
}
/**
+ * Tests running a scrutiny while updates and deletes are happening.
+ * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
+ */
+ @Test
+ public void testScrutinyWhileTakingWrites() throws Exception {
+ int id = 0;
+ while (id < 1000) {
+ int index = 1;
+ dataTableUpsertStmt.setInt(index++, id);
+ dataTableUpsertStmt.setString(index++, "name-" + id);
+ dataTableUpsertStmt.setInt(index++, id);
+ dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
+ dataTableUpsertStmt.executeUpdate();
+ id++;
+ }
+ conn.commit();
+
+ //CURRENT_SCN for scrutiny
+ long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
+
+ // launch background upserts and deletes
+ final Random random = new Random(0);
+ Runnable backgroundUpserts = new Runnable() {
+ @Override
+ public void run() {
+ int idToUpsert = random.nextInt(1000);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ PreparedStatement dataPS =
+ conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
+ upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
+ conn.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ Runnable backgroundDeletes = new Runnable() {
+ @Override
+ public void run() {
+ int idToDelete = random.nextInt(1000);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String deleteSql =
+ String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"="
+ + idToDelete;
+ conn.createStatement().executeUpdate(deleteSql);
+ conn.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
+ scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200,
+ TimeUnit.MILLISECONDS);
+ scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200,
+ TimeUnit.MILLISECONDS);
+
+ // scrutiny should report everything as ok
+ List<Job> completedJobs =
+ runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName,
+ scrutinyTS);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+ scheduledThreadPool.shutdown();
+ scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
+ }
+
+ /**
* Tests an index with the same # of rows as the data table, but one of the index rows is
* incorrect Scrutiny should report the invalid rows.
*/
@@ -570,6 +650,13 @@ public class IndexScrutinyToolIT extends BaseTest {
private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
Long maxOutputRows) {
+ return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
+ outputInvalidRows, outputFormat, maxOutputRows, Long.MAX_VALUE);
+ }
+
+ private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+ SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
+ Long maxOutputRows, Long scrutinyTs) {
final List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
@@ -591,7 +678,7 @@ public class IndexScrutinyToolIT extends BaseTest {
args.add(outputDir);
}
args.add("-t");
- args.add(String.valueOf(Long.MAX_VALUE));
+ args.add(String.valueOf(scrutinyTs));
args.add("-run-foreground");
if (batchSize != null) {
args.add("-b");
@@ -619,6 +706,10 @@ public class IndexScrutinyToolIT extends BaseTest {
return args.toArray(new String[0]);
}
+ private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
+ return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, false, null, null, scrutinyTS));
+ }
+
private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception {
return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
}
@@ -632,7 +723,7 @@ public class IndexScrutinyToolIT extends BaseTest {
Long batchSize, SourceTable sourceTable) throws Exception {
final String[] cmdArgs =
getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, false,
- null, null);
+ null, null, Long.MAX_VALUE);
return runScrutiny(cmdArgs);
}
[2/2] phoenix git commit: PHOENIX-4277 Treat delete markers
consistently with puts for point-in-time scans
Posted by ja...@apache.org.
PHOENIX-4277 Treat delete markers consistently with puts for point-in-time scans
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/438ac567
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/438ac567
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/438ac567
Branch: refs/heads/master
Commit: 438ac5676e8e8f0a69875d9b91acaf5c8ac6201c
Parents: 60a9b09
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 29 15:19:23 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Oct 30 08:59:00 2017 -0700
----------------------------------------------------------------------
.../phoenix/end2end/PointInTimeQueryIT.java | 2 +-
.../hadoop/hbase/regionserver/ScanInfoUtil.java | 35 ++++++++++++++++++++
.../coprocessor/BaseScannerRegionObserver.java | 21 ++++++++++++
.../apache/phoenix/util/TransactionUtil.java | 7 ++--
4 files changed, 62 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/438ac567/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
index c53e523..ed3e8a9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
@@ -63,7 +63,7 @@ public class PointInTimeQueryIT extends BaseQueryIT {
public PointInTimeQueryIT(String idxDdl, boolean columnEncoded)
throws Exception {
// These queries fail without KEEP_DELETED_CELLS=true
- super(idxDdl, columnEncoded, true);
+ super(idxDdl, columnEncoded, false);
}
@Test
http://git-wip-us.apache.org/repos/asf/phoenix/blob/438ac567/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
new file mode 100644
index 0000000..9885c78
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeepDeletedCells;
+
+public class ScanInfoUtil {
+ private ScanInfoUtil() {
+ }
+
+ public static boolean isKeepDeletedCells(ScanInfo scanInfo) {
+ return scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE;
+ }
+
+ public static ScanInfo cloneScanInfoWithKeepDeletedCells(ScanInfo scanInfo) {
+ return new ScanInfo(scanInfo.getConfiguration(), scanInfo.getFamily(), Math.max(scanInfo.getMinVersions(), 1),
+ scanInfo.getMaxVersions(), scanInfo.getTtl(), KeepDeletedCells.TRUE,
+ scanInfo.getTimeToPurgeDeletes(), scanInfo.getComparator());
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/438ac567/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d3b257b..519e419 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -19,10 +19,12 @@ package org.apache.phoenix.coprocessor;
import java.io.IOException;
import java.util.List;
+import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -30,10 +32,15 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@@ -48,6 +55,7 @@ import org.apache.phoenix.schema.types.PUnsignedTinyint;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@@ -350,4 +358,17 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
}
+ @Override
+ public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
+ final KeyValueScanner s) throws IOException {
+
+ if (scan.isRaw() || ScanInfoUtil.isKeepDeletedCells(store.getScanInfo()) || scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP || TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax())) {
+ return s;
+ }
+
+ ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo());
+ return new StoreScanner(store, scanInfo, scan, targetCols,
+ c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()));
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/438ac567/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 01b775e..a99c700 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
@@ -33,11 +31,16 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.PhoenixTransactionalTable;
import org.apache.phoenix.transaction.TephraTransactionTable;
import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.tephra.util.TxUtils;
public class TransactionUtil {
private TransactionUtil() {
}
+ public static boolean isTransactionalTimestamp(long ts) {
+ return !TxUtils.isPreExistingVersion(ts);
+ }
+
public static boolean isDelete(Cell cell) {
return (CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY));
}