You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/03/27 18:35:46 UTC
[3/3] phoenix git commit: Secondary indexing with txns
Secondary indexing with txns
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5a558e16
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5a558e16
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5a558e16
Branch: refs/heads/txn
Commit: 5a558e16cd7b1e882b274683aff6b655f952dac1
Parents: 826ebf5
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Mar 27 10:35:37 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Mar 27 10:35:37 2015 -0700
----------------------------------------------------------------------
.../EndToEndCoveredColumnsIndexBuilderIT.java | 12 +-
.../covered/example/FailWithoutRetriesIT.java | 198 +++---
.../apache/phoenix/hbase/index/ValueGetter.java | 4 +-
.../hbase/index/builder/BaseIndexBuilder.java | 170 +++--
.../hbase/index/builder/IndexBuildManager.java | 14 -
.../hbase/index/builder/IndexBuilder.java | 47 +-
.../phoenix/hbase/index/covered/Batch.java | 11 +-
.../covered/CoveredColumnsIndexBuilder.java | 491 ---------------
.../phoenix/hbase/index/covered/IndexCodec.java | 172 +++--
.../hbase/index/covered/LocalTableState.java | 453 +++++++------
.../hbase/index/covered/NonTxIndexBuilder.java | 405 ++++++++++++
.../phoenix/hbase/index/covered/TableState.java | 45 +-
.../hbase/index/covered/TxIndexBuilder.java | 247 ++++++++
.../index/covered/data/LazyValueGetter.java | 18 +-
.../example/CoveredColumnIndexCodec.java | 628 +++++++++----------
.../CoveredColumnIndexSpecifierBuilder.java | 4 +-
.../covered/example/CoveredColumnIndexer.java | 9 +-
.../index/covered/update/ColumnTracker.java | 3 +-
.../covered/update/IndexUpdateManager.java | 5 +-
.../hbase/index/scanner/EmptyScanner.java | 8 +-
.../phoenix/hbase/index/scanner/Scanner.java | 7 +-
.../hbase/index/scanner/ScannerBuilder.java | 12 +-
.../hbase/index/write/IndexFailurePolicy.java | 4 +-
.../apache/phoenix/index/IndexMaintainer.java | 28 +-
.../phoenix/index/PhoenixIndexBuilder.java | 90 +--
.../apache/phoenix/index/PhoenixIndexCodec.java | 145 ++---
.../index/PhoenixIndexFailurePolicy.java | 2 +-
.../phoenix/index/PhoenixTxIndexBuilder.java | 53 ++
.../index/PhoenixTxIndexFailurePolicy.java | 50 ++
.../query/ConnectionQueryServicesImpl.java | 10 +-
.../phoenix/schema/tuple/ValueGetterTuple.java | 16 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 64 +-
.../covered/CoveredIndexCodecForTesting.java | 93 ++-
.../index/covered/TestLocalTableState.java | 20 +-
.../example/TestCoveredColumnIndexCodec.java | 16 +-
.../phoenix/index/IndexMaintainerTest.java | 2 +-
36 files changed, 1904 insertions(+), 1652 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index 84c6827..fa85f00 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -59,7 +59,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
- * End-to-End test of just the {@link CoveredColumnsIndexBuilder}, but with a simple
+ * End-to-End test of just the {@link NonTxIndexBuilder}, but with a simple
* {@link IndexCodec} and BatchCache implementation.
*/
@Category(NeedsOwnMiniClusterTest.class)
@@ -151,7 +151,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns)).getFirst();
int count = 0;
- KeyValue kv;
+ Cell kv;
while ((kv = kvs.next()) != null) {
Cell next = expectedKvs.get(count++);
assertEquals(
@@ -302,9 +302,9 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
Map<String, String> indexerOpts = new HashMap<String, String>();
// just need to set the codec - we are going to set it later, but we need something here or the
// initializer blows up.
- indexerOpts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY,
+ indexerOpts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY,
CoveredIndexCodecForTesting.class.getName());
- Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER);
+ Indexer.enableIndexing(desc, NonTxIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER);
// create the table
HBaseAdmin admin = UTIL.getHBaseAdmin();
@@ -315,8 +315,8 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
HRegion region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0);
Indexer indexer =
(Indexer) region.getCoprocessorHost().findCoprocessor(Indexer.class.getName());
- CoveredColumnsIndexBuilder builder =
- (CoveredColumnsIndexBuilder) indexer.getBuilderForTesting();
+ NonTxIndexBuilder builder =
+ (NonTxIndexBuilder) indexer.getBuilderForTesting();
VerifyingIndexCodec codec = new VerifyingIndexCodec();
builder.setIndexCodecForTesting(codec);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
index dbe78e7..281ad63 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
@@ -1,19 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.covered.example;
@@ -32,6 +24,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.Bytes;
@@ -49,105 +42,106 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
/**
- * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String}
- * constructor), {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize
- * the exception, and just return <tt>null</tt> to the client, which then just goes and retries.
+ * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String} constructor),
+ * {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize the exception, and just return
+ * <tt>null</tt> to the client, which then just goes and retries.
*/
@Category(NeedsOwnMiniClusterTest.class)
public class FailWithoutRetriesIT {
- private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class);
- @Rule
- public TableName table = new TableName();
+ private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class);
+ @Rule
+ public TableName table = new TableName();
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private String getIndexTableName() {
+ return Bytes.toString(table.getTableName()) + "_index";
+ }
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ public static class FailingTestCodec extends BaseIndexCodec {
- private String getIndexTableName() {
- return Bytes.toString(table.getTableName()) + "_index";
- }
+ @Override
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
+ throw new RuntimeException("Intentionally failing deletes for " + FailWithoutRetriesIT.class.getName());
+ }
- public static class FailingTestCodec extends BaseIndexCodec {
+ @Override
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
+ throw new RuntimeException("Intentionally failing upserts for " + FailWithoutRetriesIT.class.getName());
+ }
+
+ @Override
+ public void setContext(TableState state, Mutation mutation) throws IOException {}
+
+ }
- @Override
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
- throw new RuntimeException("Intentionally failing deletes for "
- + FailWithoutRetriesIT.class.getName());
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ // setup and verify the config
+ Configuration conf = UTIL.getConfiguration();
+ setUpConfigForMiniCluster(conf);
+ IndexTestingUtils.setupConfig(conf);
+ IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
+ // start the cluster
+ UTIL.startMiniCluster();
}
- @Override
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
- throw new RuntimeException("Intentionally failing upserts for "
- + FailWithoutRetriesIT.class.getName());
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ UTIL.shutdownMiniCluster();
}
- }
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- // setup and verify the config
- Configuration conf = UTIL.getConfiguration();
- setUpConfigForMiniCluster(conf);
- IndexTestingUtils.setupConfig(conf);
- IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
- // start the cluster
- UTIL.startMiniCluster();
- }
-
- @AfterClass
- public static void teardownCluster() throws Exception {
- UTIL.shutdownMiniCluster();
- }
-
- /**
- * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't
- * rethrowing the exception correctly?
- * <p>
- * We use a custom codec to enforce the thrown exception.
- * @throws Exception
- */
- @Test(timeout = 300000)
- public void testQuickFailure() throws Exception {
- // incorrectly setup indexing for the primary table - target index table doesn't exist, which
- // should quickly return to the client
- byte[] family = Bytes.toBytes("family");
- ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
- // values are [col1]
- fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
- CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
- // add the index family
- builder.addIndexGroup(fam1);
- // usually, we would create the index table here, but we don't for the sake of the test.
-
- // setup the primary table
- String primaryTable = Bytes.toString(table.getTableName());
- @SuppressWarnings("deprecation")
- HTableDescriptor pTable = new HTableDescriptor(primaryTable);
- pTable.addFamily(new HColumnDescriptor(family));
- // override the codec so we can use our test one
- builder.build(pTable, FailingTestCodec.class);
-
- // create the primary table
- HBaseAdmin admin = UTIL.getHBaseAdmin();
- admin.createTable(pTable);
- Configuration conf = new Configuration(UTIL.getConfiguration());
- // up the number of retries/wait time to make it obvious that we are failing with retries here
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
- conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
- HTable primary = new HTable(conf, primaryTable);
- primary.setAutoFlush(false, true);
-
- // do a simple put that should be indexed
- Put p = new Put(Bytes.toBytes("row"));
- p.add(family, null, Bytes.toBytes("value"));
- primary.put(p);
- try {
- primary.flushCommits();
- fail("Shouldn't have gotten a successful write to the primary table");
- } catch (RetriesExhaustedWithDetailsException e) {
- LOG.info("Correclty got a failure of the put!");
+ /**
+ * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't rethrowing the exception
+ * correctly?
+ * <p>
+ * We use a custom codec to enforce the thrown exception.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 300000)
+ public void testQuickFailure() throws Exception {
+ // incorrectly setup indexing for the primary table - target index table doesn't exist, which
+ // should quickly return to the client
+ byte[] family = Bytes.toBytes("family");
+ ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
+ // values are [col1]
+ fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
+ CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
+ // add the index family
+ builder.addIndexGroup(fam1);
+ // usually, we would create the index table here, but we don't for the sake of the test.
+
+ // setup the primary table
+ String primaryTable = Bytes.toString(table.getTableName());
+ @SuppressWarnings("deprecation")
+ HTableDescriptor pTable = new HTableDescriptor(primaryTable);
+ pTable.addFamily(new HColumnDescriptor(family));
+ // override the codec so we can use our test one
+ builder.build(pTable, FailingTestCodec.class);
+
+ // create the primary table
+ HBaseAdmin admin = UTIL.getHBaseAdmin();
+ admin.createTable(pTable);
+ Configuration conf = new Configuration(UTIL.getConfiguration());
+ // up the number of retries/wait time to make it obvious that we are failing with retries here
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
+ conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
+ HTable primary = new HTable(conf, primaryTable);
+ primary.setAutoFlush(false, true);
+
+ // do a simple put that should be indexed
+ Put p = new Put(Bytes.toBytes("row"));
+ p.add(family, null, Bytes.toBytes("value"));
+ primary.put(p);
+ try {
+ primary.flushCommits();
+ fail("Shouldn't have gotten a successful write to the primary table");
+ } catch (RetriesExhaustedWithDetailsException e) {
+ LOG.info("Correclty got a failure of the put!");
+ }
+ primary.close();
}
- primary.close();
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
index a6e36cb..bcadc2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
@@ -19,8 +19,8 @@ package org.apache.phoenix.hbase.index;
import java.io.IOException;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
public interface ValueGetter {
@@ -32,7 +32,7 @@ public interface ValueGetter {
* present.
* @throws IOException if there is an error accessing the underlying data storage
*/
- public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException;
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException;
public byte[] getRowKey();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index f9df296..dfb9ad4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -1,95 +1,127 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.builder;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
/**
* Basic implementation of the {@link IndexBuilder} that doesn't do any actual work of indexing.
* <p>
- * You should extend this class, rather than implementing IndexBuilder directly to maintain
- * compatability going forward.
+ * You should extend this class, rather than implementing IndexBuilder directly to maintain compatability going forward.
* <p>
- * Generally, you should consider using one of the implemented IndexBuilders (e.g
- * {@link CoveredColumnsIndexBuilder}) as there is a lot of work required to keep an index table
- * up-to-date.
+ * Generally, you should consider using one of the implemented IndexBuilders (e.g {@link NonTxIndexBuilder}) as there is
+ * a lot of work required to keep an index table up-to-date.
*/
public abstract class BaseIndexBuilder implements IndexBuilder {
+ public static final String CODEC_CLASS_NAME_KEY = "org.apache.hadoop.hbase.index.codec.class";
+ private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class);
- private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class);
- protected boolean stopped;
+ protected boolean stopped;
+ protected RegionCoprocessorEnvironment env;
+ protected IndexCodec codec;
- @Override
- public void extendBaseIndexBuilderInstead() { }
-
- @Override
- public void setup(RegionCoprocessorEnvironment conf) throws IOException {
- // noop
- }
+ abstract protected boolean useRawScanToPrimeBlockCache();
+
+ @Override
+ public void extendBaseIndexBuilderInstead() {}
- @Override
- public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- // noop
- }
+ @Override
+ public void setup(RegionCoprocessorEnvironment env) throws IOException {
+ this.env = env;
+ // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here
+ // so we can use it later when generalizing covered indexes
+ Configuration conf = env.getConfiguration();
+ Class<? extends IndexCodec> codecClass = conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class);
+ try {
+ Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
+ meth.setAccessible(true);
+ this.codec = meth.newInstance();
+ this.codec.initialize(env);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
- @Override
- public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
- // noop
- }
-
- /**
- * By default, we always attempt to index the mutation. Commonly this can be slow (because the
- * framework spends the time to do the indexing, only to realize that you don't need it) or not
- * ideal (if you want to turn on/off indexing on a table without completely reloading it).
- * @throws IOException
- */
- @Override
- public boolean isEnabled(Mutation m) throws IOException {
- return true;
- }
+ @Override
+ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ // noop
+ }
- /**
- * {@inheritDoc}
- * <p>
- * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each
- * mutation always applies to different rows, even if they are in the same batch, or are
- * independent updates.
- */
- @Override
- public byte[] getBatchId(Mutation m) {
- return null;
- }
+ @Override
+ public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+ // noop
+ }
- @Override
- public void stop(String why) {
- LOG.debug("Stopping because: " + why);
- this.stopped = true;
- }
+ /**
+ * By default, we always attempt to index the mutation. Commonly this can be slow (because the framework spends the
+ * time to do the indexing, only to realize that you don't need it) or not ideal (if you want to turn on/off
+ * indexing on a table without completely reloading it).
+ *
+ * @throws IOException
+ */
+ @Override
+ public boolean isEnabled(Mutation m) throws IOException {
+ // ask the codec to see if we should even attempt indexing
+ return this.codec.isEnabled(m);
+ }
- @Override
- public boolean isStopped() {
- return this.stopped;
- }
+ /**
+ * Exposed for testing!
+ *
+ * @param codec
+ * codec to use for this instance of the builder
+ */
+ public void setIndexCodecForTesting(IndexCodec codec) {
+ this.codec = codec;
+ }
+
+ @Override
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each mutation always applies
+ * to different rows, even if they are in the same batch, or are independent updates.
+ */
+ @Override
+ public byte[] getBatchId(Mutation m) {
+ return this.codec.getBatchId(m);
+ }
+
+ @Override
+ public void stop(String why) {
+ LOG.debug("Stopping because: " + why);
+ this.stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index ba9534c..d5fd34d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -156,19 +155,6 @@ public class IndexBuildManager implements Stoppable {
return results;
}
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException {
- // all we get is a single update, so it would probably just go slower if we needed to queue it
- // up. It will increase underlying resource contention a little bit, but the mutation case is
- // far more common, so let's not worry about it for now.
- // short circuit so we don't waste time.
- if (!this.delegate.isEnabled(delete)) {
- return null;
- }
-
- return delegate.getIndexUpdate(delete);
-
- }
-
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
Collection<KeyValue> filtered) throws IOException {
// this is run async, so we can take our time here
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index b91a52a..194fdcc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -79,32 +78,26 @@ public interface IndexBuilder extends Stoppable {
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException;
- /**
- * The counter-part to {@link #getIndexUpdate(Mutation)} - your opportunity to update any/all
- * index tables based on the delete of the primary table row. This is only called for cases where
- * the client sends a single delete ({@link HTable#delete}). We separate this method from
- * {@link #getIndexUpdate(Mutation)} only for the ease of implementation as the delete path has
- * subtly different semantics for updating the families/timestamps from the generic batch path.
- * <p>
- * Its up to your implementation to ensure that timestamps match between the primary and index
- * tables.
- * <p>
- * Implementers must ensure that this method is thread-safe - it could (and probably will) be
- * called concurrently for different mutations, which may or may not be part of the same batch.
- * @param delete {@link Delete} to the primary table that may be indexed
- * @return a {@link Map} of the mutations to make -> target index table name
- * @throws IOException on failure
- */
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException;
-
- /**
- * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal
- * flush or compaction mechanisms.
- * @param filtered {@link KeyValue}s that previously existed, but won't be included in further
- * output from HBase.
- * @return a {@link Map} of the mutations to make -> target index table name
- * @throws IOException on failure
- */
+ /**
+ * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
+ * mechanisms. Currently not implemented by any implementors nor called, but left here to be implemented if we
+ * ever need it. In Jesse's words:
+ *
+ * Arguably, this is a correctness piece that should be used, but isn't. Basically, it *could* be that
+ * if a compaction/flush were to remove a key (too old, too many versions) you might want to cleanup the index table
+ * as well, if it were to get out of sync with the primary table. For instance, you might get multiple versions of
+ * the same row, which should eventually age of the oldest version. However, in the index table there would only
+ * ever be two entries for that row - the first one, matching the original row, and the delete marker for the index
+ * update, set when we got a newer version of the primary row. So, a basic HBase scan wouldn't show the index update
+ * b/c its covered by the delete marker, but an older timestamp based read would actually show the index row, even
+ * after the primary table row is gone due to MAX_VERSIONS requirement.
+ *
+ * @param filtered {@link KeyValue}s that previously existed, but won't be included
+ * in further output from HBase.
+ *
+ * @return a {@link Map} of the mutations to make -> target index table name
+ * @throws IOException on failure
+ */
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
Collection<KeyValue> filtered)
throws IOException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
index e707ea2..5e0da3c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.hbase.index.covered;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
/**
@@ -27,9 +28,9 @@ import org.apache.hadoop.hbase.KeyValue;
*/
public class Batch {
- private static final long pointDeleteCode = KeyValue.Type.Delete.getCode();
+ private static final byte pointDeleteCode = KeyValue.Type.Delete.getCode();
private final long timestamp;
- private List<KeyValue> batch = new ArrayList<KeyValue>();
+ private List<Cell> batch = new ArrayList<Cell>();
private boolean allPointDeletes = true;
/**
@@ -39,8 +40,8 @@ public class Batch {
this.timestamp = ts;
}
- public void add(KeyValue kv){
- if (pointDeleteCode != kv.getType()) {
+ public void add(Cell kv){
+ if (pointDeleteCode != kv.getTypeByte()) {
allPointDeletes = false;
}
batch.add(kv);
@@ -54,7 +55,7 @@ public class Batch {
return this.timestamp;
}
- public List<KeyValue> getKvs() {
+ public List<Cell> getKvs() {
return this.batch;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
deleted file mode 100644
index 6524fd4..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
-import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-
-/**
- * Build covered indexes for phoenix updates.
- * <p>
- * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't
- * need to do any extra synchronization in the IndexBuilder.
- * <p>
- * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or
- * flush, leading to a bloated index that needs to be cleaned up by a background process.
- */
-public class CoveredColumnsIndexBuilder extends BaseIndexBuilder {
-
- private static final Log LOG = LogFactory.getLog(CoveredColumnsIndexBuilder.class);
- public static final String CODEC_CLASS_NAME_KEY = "org.apache.hadoop.hbase.index.codec.class";
-
- protected RegionCoprocessorEnvironment env;
- protected IndexCodec codec;
- protected LocalHBaseState localTable;
-
- @Override
- public void setup(RegionCoprocessorEnvironment env) throws IOException {
- this.env = env;
- // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here
- // so we can use it later when generalizing covered indexes
- Configuration conf = env.getConfiguration();
- Class<? extends IndexCodec> codecClass =
- conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class);
- try {
- Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
- meth.setAccessible(true);
- this.codec = meth.newInstance();
- this.codec.initialize(env);
- } catch (IOException e) {
- throw e;
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- this.localTable = new LocalTable(env);
- }
-
- @Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
- // build the index updates for each group
- IndexUpdateManager updateMap = new IndexUpdateManager();
-
- batchMutationAndAddUpdates(updateMap, mutation);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found index updates for Mutation: " + mutation + "\n" + updateMap);
- }
-
- return updateMap.toMap();
- }
-
- /**
- * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each
- * key-value in the update to see if it matches the others. Generally, this will be the case, but
- * you can add kvs to a mutation that don't all have the timestamp, so we need to manage
- * everything in batches based on timestamp.
- * <p>
- * Adds all the updates in the {@link Mutation} to the state, as a side-effect.
- * @param updateMap index updates into which to add new updates. Modified as a side-effect.
- * @param state current state of the row for the mutation.
- * @param m mutation to batch
- * @throws IOException
- */
- private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException {
- // split the mutation into timestamp-based batches
- Collection<Batch> batches = createTimestampBatchesFromMutation(m);
-
- // create a state manager, so we can manage each batch
- LocalTableState state = new LocalTableState(env, localTable, m);
-
- // go through each batch of keyvalues and build separate index entries for each
- boolean cleanupCurrentState = true;
- for (Batch batch : batches) {
- /*
- * We have to split the work between the cleanup and the update for each group because when we
- * update the current state of the row for the current batch (appending the mutations for the
- * current batch) the next group will see that as the current state, which will can cause the
- * a delete and a put to be created for the next group.
- */
- if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
- cleanupCurrentState = false;
- }
- }
- }
-
- /**
- * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any
- * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
- * the time the method is called.
- * @param m {@link Mutation} from which to extract the {@link KeyValue}s
- * @return the mutation, broken into batches and sorted in ascending order (smallest first)
- */
- protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
- Map<Long, Batch> batches = new HashMap<Long, Batch>();
- for (List<Cell> family : m.getFamilyCellMap().values()) {
- List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
- createTimestampBatchesFromKeyValues(familyKVs, batches);
- }
- // sort the batches
- List<Batch> sorted = new ArrayList<Batch>(batches.values());
- Collections.sort(sorted, new Comparator<Batch>() {
- @Override
- public int compare(Batch o1, Batch o2) {
- return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
- }
- });
- return sorted;
- }
-
- /**
- * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any
- * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
- * the time the method is called.
- * @param kvs {@link KeyValue}s to break into batches
- * @param batches to update with the given kvs
- */
- protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs,
- Map<Long, Batch> batches) {
- long now = EnvironmentEdgeManager.currentTimeMillis();
- byte[] nowBytes = Bytes.toBytes(now);
-
- // batch kvs by timestamp
- for (KeyValue kv : kvs) {
- long ts = kv.getTimestamp();
- // override the timestamp to the current time, so the index and primary tables match
- // all the keys with LATEST_TIMESTAMP will then be put into the same batch
- if (kv.updateLatestStamp(nowBytes)) {
- ts = now;
- }
- Batch batch = batches.get(ts);
- if (batch == null) {
- batch = new Batch(ts);
- batches.put(ts, batch);
- }
- batch.add(kv);
- }
- }
-
- /**
- * For a single batch, get all the index updates and add them to the updateMap
- * <p>
- * This method manages cleaning up the entire history of the row from the given timestamp forward
- * for out-of-order (e.g. 'back in time') updates.
- * <p>
- * If things arrive out of order (client is using custom timestamps) we should still see the index
- * in the correct order (assuming we scan after the out-of-order update in finished). Therefore,
- * we when we aren't the most recent update to the index, we need to delete the state at the
- * current timestamp (similar to above), but also issue a delete for the added index updates at
- * the next newest timestamp of any of the columns in the update; we need to cleanup the insert so
- * it looks like it was also deleted at that next newest timestamp. However, its not enough to
- * just update the one in front of us - that column will likely be applied to index entries up the
- * entire history in front of us, which also needs to be fixed up.
- * <p>
- * However, the current update usually will be the most recent thing to be added. In that case,
- * all we need to is issue a delete for the previous index row (the state of the row, without the
- * update applied) at the current timestamp. This gets rid of anything currently in the index for
- * the current state of the row (at the timestamp). Then we can just follow that by applying the
- * pending update and building the index update based on the new row state.
- * @param updateMap map to update with new index elements
- * @param batch timestamp-based batch of edits
- * @param state local state to update and pass to the codec
- * @param requireCurrentStateCleanup <tt>true</tt> if we should should attempt to cleanup the
- * current state of the table, in the event of a 'back in time' batch. <tt>false</tt>
- * indicates we should not attempt the cleanup, e.g. an earlier batch already did the
- * cleanup.
- * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put),
- * <tt>false</tt> otherwise
- * @throws IOException
- */
- private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch,
- LocalTableState state, boolean requireCurrentStateCleanup) throws IOException {
-
- // need a temporary manager for the current batch. It should resolve any conflicts for the
- // current batch. Essentially, we can get the case where a batch doesn't change the current
- // state of the index (all Puts are covered by deletes), in which case we don't want to add
- // anything
- // A. Get the correct values for the pending state in the batch
- // A.1 start by cleaning up the current state - as long as there are key-values in the batch
- // that are indexed, we need to change the current state of the index. Its up to the codec to
- // determine if we need to make any cleanup given the pending update.
- long batchTs = batch.getTimestamp();
- state.setPendingUpdates(batch.getKvs());
- addCleanupForCurrentBatch(updateMap, batchTs, state);
-
- // A.2 do a single pass first for the updates to the current state
- state.applyPendingUpdates();
- long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
- // if all the updates are the latest thing in the index, we are done - don't go and fix history
- if (ColumnTracker.isNewestTime(minTs)) {
- return false;
- }
-
- // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
- // index. after this, we have the correct view of the index, from the batch up to the index
- while(!ColumnTracker.isNewestTime(minTs) ){
- minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
- }
-
- // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
- if (requireCurrentStateCleanup) {
- // roll back the pending update. This is needed so we can remove all the 'old' index entries.
- // We don't need to do the puts here, but just the deletes at the given timestamps since we
- // just want to completely hide the incorrect entries.
- state.rollback(batch.getKvs());
- // setup state
- state.setPendingUpdates(batch.getKvs());
-
- // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
- // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
- // because the update may have a different set of columns or value based on the update).
- cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
-
- // have to roll the state forward again, so the current state is correct
- state.applyPendingUpdates();
- return true;
- }
- return false;
- }
-
- private long addUpdateForGivenTimestamp(long ts, LocalTableState state,
- IndexUpdateManager updateMap) throws IOException {
- state.setCurrentTimestamp(ts);
- ts = addCurrentStateMutationsForBatch(updateMap, state);
- return ts;
- }
-
- private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs,
- LocalTableState state) throws IOException {
- // get the cleanup for the current state
- state.setCurrentTimestamp(batchTs);
- addDeleteUpdatesToMap(updateMap, state, batchTs);
- // ignore any index tracking from the delete
- state.resetTrackedColumns();
- }
-
- /**
- * Add the necessary mutations for the pending batch on the local state. Handles rolling up
- * through history to determine the index changes after applying the batch (for the case where the
- * batch is back in time).
- * @param updateMap to update with index mutations
- * @param batch to apply to the current state
- * @param state current state of the table
- * @return the minimum timestamp across all index columns requested. If
- * {@link ColumnTracker#isNewestTime(long)} returns <tt>true</tt> on the returned
- * timestamp, we know that this <i>was not a back-in-time update</i>.
- * @throws IOException
- */
- private long
- addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state) throws IOException {
-
- // get the index updates for this current batch
- Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
- state.resetTrackedColumns();
-
- /*
- * go through all the pending updates. If we are sure that all the entries are the latest
- * timestamp, we can just add the index updates and move on. However, if there are columns that
- * we skip past (based on the timestamp of the batch), we need to roll back up the history.
- * Regardless of whether or not they are the latest timestamp, the entries here are going to be
- * correct for the current batch timestamp, so we add them to the updates. The only thing we
- * really care about it if we need to roll up the history and fix it as we go.
- */
- // timestamp of the next update we need to track
- long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
- List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
- for (IndexUpdate update : upserts) {
- // this is the one bit where we check the timestamps
- final ColumnTracker tracker = update.getIndexedColumns();
- long trackerTs = tracker.getTS();
- // update the next min TS we need to track
- if (trackerTs < minTs) {
- minTs = tracker.getTS();
- }
- // track index hints for the next round. Hint if we need an update for that column for the
- // next timestamp. These columns clearly won't need to update as we go through time as they
- // already match the most recent possible thing.
- boolean needsCleanup = false;
- if (tracker.hasNewerTimestamps()) {
- columnHints.add(tracker);
- // this update also needs to be cleaned up at the next timestamp because it not the latest.
- needsCleanup = true;
- }
-
-
- // only make the put if the index update has been setup
- if (update.isValid()) {
- byte[] table = update.getTableName();
- Mutation mutation = update.getUpdate();
- updateMap.addIndexUpdate(table, mutation);
-
- // only make the cleanup if we made a put and need cleanup
- if (needsCleanup) {
- // there is a TS for the interested columns that is greater than the columns in the
- // put. Therefore, we need to issue a delete at the same timestamp
- Delete d = new Delete(mutation.getRow());
- d.setTimestamp(tracker.getTS());
- updateMap.addIndexUpdate(table, d);
- }
- }
- }
- return minTs;
- }
-
- /**
- * Cleanup the index based on the current state from the given batch. Iterates over each timestamp
- * (for the indexed rows) for the current state of the table and cleans up all the existing
- * entries generated by the codec.
- * <p>
- * Adds all pending updates to the updateMap
- * @param updateMap updated with the pending index updates from the codec
- * @param batchTs timestamp from which we should cleanup
- * @param state current state of the primary table. Should already by setup to the correct state
- * from which we want to cleanup.
- * @throws IOException
- */
- private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap,
- long batchTs, LocalTableState state) throws IOException {
- // get the cleanup for the current state
- state.setCurrentTimestamp(batchTs);
- addDeleteUpdatesToMap(updateMap, state, batchTs);
- Set<ColumnTracker> trackers = state.getTrackedColumns();
- long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
- for (ColumnTracker tracker : trackers) {
- if (tracker.getTS() < minTs) {
- minTs = tracker.getTS();
- }
- }
- state.resetTrackedColumns();
- if (!ColumnTracker.isNewestTime(minTs)) {
- state.setHints(Lists.newArrayList(trackers));
- cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
- }
- }
-
-
- /**
- * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then
- * add them to the update map.
- * <p>
- * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates
- * applied, etc).
- * @throws IOException
- */
- protected void
- addDeleteUpdatesToMap(IndexUpdateManager updateMap,
- LocalTableState state, long ts) throws IOException {
- Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
- if (cleanup != null) {
- for (IndexUpdate d : cleanup) {
- if (!d.isValid()) {
- continue;
- }
- // override the timestamps in the delete to match the current batch.
- Delete remove = (Delete)d.getUpdate();
- remove.setTimestamp(ts);
- updateMap.addIndexUpdate(d.getTableName(), remove);
- }
- }
- }
-
- @Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete d) throws IOException {
- // stores all the return values
- IndexUpdateManager updateMap = new IndexUpdateManager();
-
- // We have to figure out which kind of delete it is, since we need to do different things if its
- // a general (row) delete, versus a delete of just a single column or family
- Map<byte[], List<Cell>> families = d.getFamilyCellMap();
-
- /*
- * Option 1: its a row delete marker, so we just need to delete the most recent state for each
- * group, as of the specified timestamp in the delete. This can happen if we have a single row
- * update and it is part of a batch mutation (prepare doesn't happen until later... maybe a
- * bug?). In a single delete, this delete gets all the column families appended, so the family
- * map won't be empty by the time it gets here.
- */
- if (families.size() == 0) {
- LocalTableState state = new LocalTableState(env, localTable, d);
- // get a consistent view of name
- long now = d.getTimeStamp();
- if (now == HConstants.LATEST_TIMESTAMP) {
- now = EnvironmentEdgeManager.currentTimeMillis();
- // update the delete's idea of 'now' to be consistent with the index
- d.setTimestamp(now);
- }
- // get deletes from the codec
- // we only need to get deletes and not add puts because this delete covers all columns
- addDeleteUpdatesToMap(updateMap, state, now);
-
- /*
- * Update the current state for all the kvs in the delete. Generally, we would just iterate
- * the family map, but since we go here, the family map is empty! Therefore, we need to fake a
- * bunch of family deletes (just like hos HRegion#prepareDelete works). This is just needed
- * for current version of HBase that has an issue where the batch update doesn't update the
- * deletes before calling the hook.
- */
- byte[] deleteRow = d.getRow();
- for (byte[] family : this.env.getRegion().getTableDesc().getFamiliesKeys()) {
- state.addPendingUpdates(new KeyValue(deleteRow, family, null, now,
- KeyValue.Type.DeleteFamily));
- }
- } else {
- // Option 2: Its actually a bunch single updates, which can have different timestamps.
- // Therefore, we need to do something similar to the put case and batch by timestamp
- batchMutationAndAddUpdates(updateMap, d);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found index updates for Delete: " + d + "\n" + updateMap);
- }
-
- return updateMap.toMap();
- }
-
- @Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
- Collection<KeyValue> filtered) throws IOException {
- // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
- return null;
- }
-
- /**
- * Exposed for testing!
- * @param codec codec to use for this instance of the builder
- */
- public void setIndexCodecForTesting(IndexCodec codec) {
- this.codec = codec;
- }
-
- @Override
- public boolean isEnabled(Mutation m) throws IOException {
- // ask the codec to see if we should even attempt indexing
- return this.codec.isEnabled(m);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index daa631b..e3ef831 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -1,19 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.covered;
@@ -22,89 +14,93 @@ import java.io.IOException;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
import org.apache.phoenix.index.BaseIndexCodec;
-
/**
* Codec for creating index updates from the current state of a table.
* <p>
- * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as
- * features need to be added to the codec, as well as potentially not haivng to implement some
- * methods.
+ * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as features need to be
+ * added to the codec, as well as potentially not haivng to implement some methods.
*/
public interface IndexCodec {
- /**
- * Do any code initialization necessary
- * @param env environment in which the codec is operating
- * @throws IOException if the codec cannot be initalized correctly
- */
- public void initialize(RegionCoprocessorEnvironment env) throws IOException;
+ /**
+ * Do any code initialization necessary
+ *
+ * @param env
+ * environment in which the codec is operating
+ * @throws IOException
+ * if the codec cannot be initalized correctly
+ */
+ public void initialize(RegionCoprocessorEnvironment env) throws IOException;
+
+ /**
+ * Get the index cleanup entries. Currently, this must return just single row deletes (where just the row-key is
+ * specified and no columns are returned) mapped to the table name. For instance, to you have an index 'myIndex'
+ * with row :
+ *
+ * <pre>
+ * v1,v2,v3 | CF:CQ0 | rowkey
+ * | CF:CQ1 | rowkey
+ * </pre>
+ *
+ * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'.
+ *
+ * @param state
+ * the current state of the table that needs to be cleaned up. Generally, you only care about the latest
+ * column values, for each column you are indexing for each index table.
+ * @return the pairs of (deletes, index table name) that should be applied.
+ * @throws IOException
+ */
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
- /**
- * Get the index cleanup entries. Currently, this must return just single row deletes (where just
- * the row-key is specified and no columns are returned) mapped to the table name. For instance,
- * to you have an index 'myIndex' with row :
- *
- * <pre>
- * v1,v2,v3 | CF:CQ0 | rowkey
- * | CF:CQ1 | rowkey
- * </pre>
- *
- * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'.
- * @param state the current state of the table that needs to be cleaned up. Generally, you only
- * care about the latest column values, for each column you are indexing for each index
- * table.
- * @return the pairs of (deletes, index table name) that should be applied.
- * @throws IOException
- */
- public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
+ // table state has the pending update already applied, before calling
+ // get the new index entries
+ /**
+ * Get the index updates for the primary table state, for each index table. The returned {@link Put}s need to be
+ * fully specified (including timestamp) to minimize passes over the same key-values multiple times.
+ * <p>
+ * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so the index entries
+ * match the primary table row. This could be managed at a higher level, but would require iterating all the kvs in
+ * the Put again - very inefficient when compared to the current interface where you must provide a timestamp
+ * anyways (so you might as well provide the right one).
+ *
+ * @param state
+ * the current state of the table that needs to an index update Generally, you only care about the latest
+ * column values, for each column you are indexing for each index table.
+ * @return the pairs of (updates,index table name) that should be applied.
+ * @throws IOException
+ */
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
- // table state has the pending update already applied, before calling
- // get the new index entries
- /**
- * Get the index updates for the primary table state, for each index table. The returned
- * {@link Put}s need to be fully specified (including timestamp) to minimize passes over the same
- * key-values multiple times.
- * <p>
- * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so
- * the index entries match the primary table row. This could be managed at a higher level, but
- * would require iterating all the kvs in the Put again - very inefficient when compared to the
- * current interface where you must provide a timestamp anyways (so you might as well provide the
- * right one).
- * @param state the current state of the table that needs to an index update Generally, you only
- * care about the latest column values, for each column you are indexing for each index
- * table.
- * @return the pairs of (updates,index table name) that should be applied.
- * @throws IOException
- */
- public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
+ /**
+ * This allows the codec to dynamically change whether or not indexing should take place for a table. If it doesn't
+ * take place, we can save a lot of time on the regular Put patch. By making it dynamic, we can save offlining and
+ * then onlining a table just to turn indexing on.
+ * <p>
+ * We can also be smart about even indexing a given update here too - if the update doesn't contain any columns that
+ * we care about indexing, we can save the effort of analyzing the put and further.
+ *
+ * @param m
+ * mutation that should be indexed.
+ * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table basis, as each
+ * codec is instantiated per-region.
+ * @throws IOException
+ */
+ public boolean isEnabled(Mutation m) throws IOException;
- /**
- * This allows the codec to dynamically change whether or not indexing should take place for a
- * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making
- * it dynamic, we can save offlining and then onlining a table just to turn indexing on.
- * <p>
- * We can also be smart about even indexing a given update here too - if the update doesn't
- * contain any columns that we care about indexing, we can save the effort of analyzing the put
- * and further.
- * @param m mutation that should be indexed.
- * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table
- * basis, as each codec is instantiated per-region.
- * @throws IOException
- */
- public boolean isEnabled(Mutation m) throws IOException;
+ /**
+ * Get the batch identifier of the given mutation. Generally, updates to the table will take place in a batch of
+ * updates; if we know that the mutation is part of a batch, we can build the state much more intelligently.
+ * <p>
+ * <b>If you have batches that have multiple updates to the same row state, you must specify a batch id for each
+ * batch. Otherwise, we cannot guarantee index correctness</b>
+ *
+ * @param m
+ * mutation that may or may not be part of the batch
+ * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
+ */
+ public byte[] getBatchId(Mutation m);
- /**
- * Get the batch identifier of the given mutation. Generally, updates to the table will take place
- * in a batch of updates; if we know that the mutation is part of a batch, we can build the state
- * much more intelligently.
- * <p>
- * <b>If you have batches that have multiple updates to the same row state, you must specify a
- * batch id for each batch. Otherwise, we cannot guarantee index correctness</b>
- * @param m mutation that may or may not be part of the batch
- * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
- */
- public byte[] getBatchId(Mutation m);
+ public void setContext(TableState state, Mutation mutation) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index e4bc193..f47a71a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -1,19 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.covered;
@@ -34,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.util.Pair;
-
+import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.data.IndexMemStore;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -42,203 +34,250 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+import com.google.common.collect.Maps;
/**
* Manage the state of the HRegion's view of the table, for the single row.
* <p>
- * Currently, this is a single-use object - you need to create a new one for each row that you need
- * to manage. In the future, we could make this object reusable, but for the moment its easier to
- * manage as a throw-away object.
+ * Currently, this is a single-use object - you need to create a new one for each row that you need to manage. In the
+ * future, we could make this object reusable, but for the moment its easier to manage as a throw-away object.
* <p>
- * This class is <b>not</b> thread-safe - it requires external synchronization is access
- * concurrently.
+ * This class is <b>not</b> thread-safe - it requires external synchronization is access concurrently.
*/
public class LocalTableState implements TableState {
- private long ts;
- private RegionCoprocessorEnvironment env;
- private KeyValueStore memstore;
- private LocalHBaseState table;
- private Mutation update;
- private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
- private ScannerBuilder scannerBuilder;
- private List<KeyValue> kvs = new ArrayList<KeyValue>();
- private List<? extends IndexedColumnGroup> hints;
- private CoveredColumns columnSet;
-
- public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
- this.env = environment;
- this.table = table;
- this.update = update;
- this.memstore = new IndexMemStore();
- this.scannerBuilder = new ScannerBuilder(memstore, update);
- this.columnSet = new CoveredColumns();
- }
-
- public void addPendingUpdates(KeyValue... kvs) {
- if (kvs == null) return;
- addPendingUpdates(Arrays.asList(kvs));
- }
-
- public void addPendingUpdates(List<KeyValue> kvs) {
- if(kvs == null) return;
- setPendingUpdates(kvs);
- addUpdate(kvs);
- }
-
- private void addUpdate(List<KeyValue> list) {
- addUpdate(list, true);
- }
-
- private void addUpdate(List<KeyValue> list, boolean overwrite) {
- if (list == null) return;
- for (KeyValue kv : list) {
- this.memstore.add(kv, overwrite);
- }
- }
-
- @Override
- public RegionCoprocessorEnvironment getEnvironment() {
- return this.env;
- }
-
- @Override
- public long getCurrentTimestamp() {
- return this.ts;
- }
-
- @Override
- public void setCurrentTimestamp(long timestamp) {
- this.ts = timestamp;
- }
-
- public void resetTrackedColumns() {
- this.trackedColumns.clear();
- }
-
- public Set<ColumnTracker> getTrackedColumns() {
- return this.trackedColumns;
- }
-
- @Override
- public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
- Collection<? extends ColumnReference> indexedColumns) throws IOException {
- ensureLocalStateInitialized(indexedColumns);
- // filter out things with a newer timestamp and track the column references to which it applies
- ColumnTracker tracker = new ColumnTracker(indexedColumns);
- synchronized (this.trackedColumns) {
- // we haven't seen this set of columns before, so we need to create a new tracker
- if (!this.trackedColumns.contains(tracker)) {
- this.trackedColumns.add(tracker);
- }
- }
-
- Scanner scanner =
- this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
-
- return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
- }
-
- /**
- * Initialize the managed local state. Generally, this will only be called by
- * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside.
- * Even then, there is still fairly low contention as each new Put/Delete will have its own table
- * state.
- */
- private synchronized void ensureLocalStateInitialized(
- Collection<? extends ColumnReference> columns) throws IOException {
- // check to see if we haven't initialized any columns yet
- Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
- // we have all the columns loaded, so we are good to go.
- if (toCover.isEmpty()) {
- return;
- }
-
- // add the current state of the row
- this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false);
-
- // add the covered columns to the set
- for (ColumnReference ref : toCover) {
- this.columnSet.addColumn(ref);
- }
- }
-
- @Override
- public Map<String, byte[]> getUpdateAttributes() {
- return this.update.getAttributesMap();
- }
-
- @Override
- public byte[] getCurrentRowKey() {
- return this.update.getRow();
- }
-
- public Result getCurrentRowState() {
- KeyValueScanner scanner = this.memstore.getScanner();
- List<Cell> kvs = new ArrayList<Cell>();
- while (scanner.peek() != null) {
- try {
- kvs.add(scanner.next());
- } catch (IOException e) {
- // this should never happen - something has gone terribly arwy if it has
- throw new RuntimeException("Local MemStore threw IOException!");
- }
- }
- return Result.create(kvs);
- }
-
- /**
- * Helper to add a {@link Mutation} to the values stored for the current row
- * @param pendingUpdate update to apply
- */
- public void addUpdateForTesting(Mutation pendingUpdate) {
- for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
- List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue());
- addUpdate(edits);
- }
- }
-
- /**
- * @param hints
- */
- public void setHints(List<? extends IndexedColumnGroup> hints) {
- this.hints = hints;
- }
-
- @Override
- public List<? extends IndexedColumnGroup> getIndexColumnHints() {
- return this.hints;
- }
-
- @Override
- public Collection<KeyValue> getPendingUpdate() {
- return this.kvs;
- }
-
- /**
- * Set the {@link KeyValue}s in the update for which we are currently building an index update,
- * but don't actually apply them.
- * @param update pending {@link KeyValue}s
- */
- public void setPendingUpdates(Collection<KeyValue> update) {
- this.kvs.clear();
- this.kvs.addAll(update);
- }
-
- /**
- * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
- */
- public void applyPendingUpdates() {
- this.addUpdate(kvs);
- }
-
- /**
- * Rollback all the given values from the underlying state.
- * @param values
- */
- public void rollback(Collection<KeyValue> values) {
- for (KeyValue kv : values) {
- this.memstore.rollback(kv);
- }
- }
+ private long ts;
+ private RegionCoprocessorEnvironment env;
+ private KeyValueStore memstore;
+ private LocalHBaseState table;
+ private Mutation update;
+ private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
+ private ScannerBuilder scannerBuilder;
+ private List<Cell> kvs = new ArrayList<Cell>();
+ private List<? extends IndexedColumnGroup> hints;
+ private CoveredColumns columnSet;
+ private final Map<String,Object> context = Maps.newHashMap();
+
+ public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
+ this.env = environment;
+ this.table = table;
+ this.update = update;
+ this.memstore = new IndexMemStore();
+ this.scannerBuilder = new ScannerBuilder(memstore, update);
+ this.columnSet = new CoveredColumns();
+ }
+
+ public void addPendingUpdates(Cell... kvs) {
+ if (kvs == null) return;
+ addPendingUpdates(Arrays.asList(kvs));
+ }
+
+ public void addPendingUpdates(List<Cell> kvs) {
+ if (kvs == null) return;
+ setPendingUpdates(kvs);
+ addUpdate(kvs);
+ }
+
+ private void addUpdate(List<Cell> list) {
+ addUpdate(list, true);
+ }
+
+ private void addUpdate(List<Cell> list, boolean overwrite) {
+ if (list == null) return;
+ for (Cell kv : list) {
+ this.memstore.add(KeyValueUtil.ensureKeyValue(kv), overwrite);
+ }
+ }
+
+ @Override
+ public RegionCoprocessorEnvironment getEnvironment() {
+ return this.env;
+ }
+
+ @Override
+ public long getCurrentTimestamp() {
+ return this.ts;
+ }
+
+ /**
+ * Set the current timestamp up to which the table should allow access to the underlying table.
+ * This overrides the timestamp view provided by the indexer - use with care!
+ * @param timestamp timestamp up to which the table should allow access.
+ */
+ public void setCurrentTimestamp(long timestamp) {
+ this.ts = timestamp;
+ }
+
+ public void resetTrackedColumns() {
+ this.trackedColumns.clear();
+ }
+
+ public Set<ColumnTracker> getTrackedColumns() {
+ return this.trackedColumns;
+ }
+
+ /**
+ * Get a scanner on the columns that are needed by the index.
+ * <p>
+ * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given
+ * columns with a timestamp earlier than the timestamp to which the table is currently set (the
+ * current state of the table for which we need to build an update).
+ * <p>
+ * If none of the passed columns matches any of the columns in the pending update (as determined
+ * by {@link ColumnReference#matchesFamily(byte[])} and
+ * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This
+ * is because it doesn't make sense to build index updates when there is no change in the table
+ * state for any of the columns you are indexing.
+ * <p>
+ * <i>NOTE:</i> This method should <b>not</b> be used during
+ * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been
+ * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i>
+ * need to track the indexed columns.
+ * <p>
+ * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you
+ * request - you will never see a column with the timestamp we are tracking, but the next oldest
+ * timestamp for that column.
+ * @param indexedColumns the columns to that will be indexed
+ * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
+ * the builder. Even if no update is necessary for the requested columns, you still need
+ * to return the {@link IndexUpdate}, just don't set the update for the
+ * {@link IndexUpdate}.
+ * @throws IOException
+ */
+ public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+ Collection<? extends ColumnReference> indexedColumns) throws IOException {
+ ensureLocalStateInitialized(indexedColumns);
+ // filter out things with a newer timestamp and track the column references to which it applies
+ ColumnTracker tracker = new ColumnTracker(indexedColumns);
+ synchronized (this.trackedColumns) {
+ // we haven't seen this set of columns before, so we need to create a new tracker
+ if (!this.trackedColumns.contains(tracker)) {
+ this.trackedColumns.add(tracker);
+ }
+ }
+
+ Scanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
+
+ return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
+ }
+
+ /**
+ * Initialize the managed local state. Generally, this will only be called by
+ * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even
+ * then, there is still fairly low contention as each new Put/Delete will have its own table state.
+ */
+ private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns)
+ throws IOException {
+ // check to see if we haven't initialized any columns yet
+ Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
+ // we have all the columns loaded, so we are good to go.
+ if (toCover.isEmpty()) { return; }
+
+ // add the current state of the row
+ this.addUpdate(this.table.getCurrentRowState(update, toCover).listCells(), false);
+
+ // add the covered columns to the set
+ for (ColumnReference ref : toCover) {
+ this.columnSet.addColumn(ref);
+ }
+ }
+
+ @Override
+ public Map<String, byte[]> getUpdateAttributes() {
+ return this.update.getAttributesMap();
+ }
+
+ @Override
+ public byte[] getCurrentRowKey() {
+ return this.update.getRow();
+ }
+
+ public Result getCurrentRowState() {
+ KeyValueScanner scanner = this.memstore.getScanner();
+ List<Cell> kvs = new ArrayList<Cell>();
+ while (scanner.peek() != null) {
+ try {
+ kvs.add(scanner.next());
+ } catch (IOException e) {
+ // this should never happen - something has gone terribly arwy if it has
+ throw new RuntimeException("Local MemStore threw IOException!");
+ }
+ }
+ return Result.create(kvs);
+ }
+
+ /**
+ * Helper to add a {@link Mutation} to the values stored for the current row
+ *
+ * @param pendingUpdate
+ * update to apply
+ */
+ public void addUpdateForTesting(Mutation pendingUpdate) {
+ for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
+ List<Cell> edits = e.getValue();
+ addUpdate(edits);
+ }
+ }
+
+ /**
+ * @param hints
+ */
+ public void setHints(List<? extends IndexedColumnGroup> hints) {
+ this.hints = hints;
+ }
+
+ @Override
+ public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+ return this.hints;
+ }
+
+ @Override
+ public Collection<Cell> getPendingUpdate() {
+ return this.kvs;
+ }
+
+ /**
+ * Set the {@link KeyValue}s in the update for which we are currently building an index update, but don't actually
+ * apply them.
+ *
+ * @param update
+ * pending {@link KeyValue}s
+ */
+ public void setPendingUpdates(Collection<Cell> update) {
+ this.kvs.clear();
+ this.kvs.addAll(update);
+ }
+
+ /**
+ * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
+ */
+ public void applyPendingUpdates() {
+ this.addUpdate(kvs);
+ }
+
+ /**
+ * Rollback all the given values from the underlying state.
+ *
+ * @param values
+ */
+ public void rollback(Collection<Cell> values) {
+ for (Cell kv : values) {
+ this.memstore.rollback(KeyValueUtil.ensureKeyValue(kv));
+ }
+ }
+
+ @Override
+ public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+ throws IOException {
+ Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns);
+ ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
+ return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
+ }
+
+ @Override
+ public Map<String, Object> getContext() {
+ return context;
+ }
}
\ No newline at end of file