You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:47 UTC
[46/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/Indexer.java b/src/main/java/org/apache/hbase/index/Indexer.java
new file mode 100644
index 0000000..6f6ba6a
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/Indexer.java
@@ -0,0 +1,704 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index;
+
+import static org.apache.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.builder.IndexBuildManager;
+import org.apache.hbase.index.builder.IndexBuilder;
+import org.apache.hbase.index.builder.IndexBuildingFailureException;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.hbase.index.util.IndexManagementUtil;
+import org.apache.hbase.index.wal.IndexedKeyValue;
+import org.apache.hbase.index.write.IndexFailurePolicy;
+import org.apache.hbase.index.write.IndexWriter;
+import org.apache.hbase.index.write.recovery.PerRegionIndexWriteCache;
+import org.apache.hbase.index.write.recovery.StoreFailuresInCachePolicy;
+import org.apache.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter;
+
+/**
+ * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
+ * to an {@link IndexBuilder} to determine the actual updates to make.
+ * <p>
+ * If the WAL is enabled, these updates are then added to the WALEdit and attempted to be written to
+ * the WAL after the WALEdit has been saved. If any of the index updates fail, this server is
+ * immediately terminated and we rely on WAL replay to attempt the index updates again (see
+ * {@link #preWALRestore(ObserverContext, HRegionInfo, HLogKey, WALEdit)}).
+ * <p>
+ * If the WAL is disabled, the updates are attempted immediately. No consistency guarantees are made
+ * if the WAL is disabled - some or none of the index updates may be successful. All updates in a
+ * single batch must have the same durability level - either everything gets written to the WAL or
+ * nothing does. Currently, we do not support mixed-durability updates within a single batch. If you
+ * want to have different durability levels, you only need to split the updates into two different
+ * batches.
+ */
+public class Indexer extends BaseRegionObserver {
+
+ private static final Log LOG = LogFactory.getLog(Indexer.class);
+
+ /** WAL on this server */
+ private HLog log;
+ protected IndexWriter writer;
+ protected IndexBuildManager builder;
+
+ /** Configuration key for the {@link IndexBuilder} to use */
+ public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
+
+ // Setup out locking on the index edits/WAL so we can be sure that we don't lose a roll a WAL edit
+ // before an edit is applied to the index tables
+ private static final ReentrantReadWriteLock INDEX_READ_WRITE_LOCK = new ReentrantReadWriteLock(
+ true);
+ public static final ReadLock INDEX_UPDATE_LOCK = INDEX_READ_WRITE_LOCK.readLock();
+
+ /**
+ * Configuration key for if the indexer should check the version of HBase is running. Generally,
+ * you only want to ignore this for testing or for custom versions of HBase.
+ */
+ public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
+
+ private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hbase.index.recovery.failurepolicy";
+
+ /**
+ * Marker {@link KeyValue} to indicate that we are doing a batch operation. Needed because the
+ * coprocessor framework throws away the WALEdit from the prePut/preDelete hooks when checking a
+ * batch if there were no {@link KeyValue}s attached to the {@link WALEdit}. When you get down to
+ * the preBatch hook, there won't be any WALEdits to which to add the index updates.
+ */
+ private static KeyValue BATCH_MARKER = new KeyValue();
+
+ /**
+ * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms
+ * more robust in the face of recoverying index regions that were on the same server as the
+ * primary table region
+ */
+ private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache();
+
+ /**
+ * IndexWriter for writing the recovered index edits. Separate from the main indexer since we need
+ * different write/failure policies
+ */
+ private IndexWriter recoveryWriter;
+
+ private boolean stopped;
+ private boolean disabled;
+
+ public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY;
+
+ @Override
+ public void start(CoprocessorEnvironment e) throws IOException {
+ try {
+ final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+ String serverName = env.getRegionServerServices().getServerName().getServerName();
+ if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
+ // make sure the right version <-> combinations are allowed.
+ String errormsg = Indexer.validateVersion(env.getHBaseVersion(), env.getConfiguration());
+ if (errormsg != null) {
+ IOException ioe = new IOException(errormsg);
+ env.getRegionServerServices().abort(errormsg, ioe);
+ throw ioe;
+ }
+ }
+
+ this.builder = new IndexBuildManager(env);
+
+ // get a reference to the WAL
+ log = env.getRegionServerServices().getWAL();
+ // add a synchronizer so we don't archive a WAL that we need
+ log.registerWALActionsListener(new IndexLogRollSynchronizer(INDEX_READ_WRITE_LOCK.writeLock()));
+
+ // setup the actual index writer
+ this.writer = new IndexWriter(env, serverName + "-index-writer");
+
+ // setup the recovery writer that does retries on the failed edits
+ TrackingParallelWriterIndexCommitter recoveryCommmiter =
+ new TrackingParallelWriterIndexCommitter();
+
+ try {
+ // get the specified failure policy. We only ever override it in tests, but we need to do it
+ // here
+ Class<? extends IndexFailurePolicy> policyClass =
+ env.getConfiguration().getClass(INDEX_RECOVERY_FAILURE_POLICY_KEY,
+ StoreFailuresInCachePolicy.class, IndexFailurePolicy.class);
+ IndexFailurePolicy policy =
+ policyClass.getConstructor(PerRegionIndexWriteCache.class).newInstance(failedIndexEdits);
+ LOG.debug("Setting up recovery writter with committer: " + recoveryCommmiter.getClass()
+ + " and failure policy: " + policy.getClass());
+ recoveryWriter =
+ new IndexWriter(recoveryCommmiter, policy, env, serverName + "-recovery-writer");
+ } catch (Exception ex) {
+ throw new IOException("Could not instantiate recovery failure policy!", ex);
+ }
+ } catch (NoSuchMethodError ex) {
+ disabled = true;
+ super.start(e);
+ LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex);
+ }
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment e) throws IOException {
+ if (this.stopped) {
+ return;
+ }
+ if (this.disabled) {
+ super.stop(e);
+ return;
+ }
+ this.stopped = true;
+ String msg = "Indexer is being stopped";
+ this.builder.stop(msg);
+ this.writer.stop(msg);
+ this.recoveryWriter.stop(msg);
+ }
+
+ @Override
+ public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
+ final WALEdit edit, final boolean writeToWAL) throws IOException {
+ if (this.disabled) {
+ super.prePut(c, put, edit, writeToWAL);
+ return;
+ }
+ // just have to add a batch marker to the WALEdit so we get the edit again in the batch
+ // processing step. We let it throw an exception here because something terrible has happened.
+ edit.add(BATCH_MARKER);
+ }
+
+ @Override
+ public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
+ WALEdit edit, boolean writeToWAL) throws IOException {
+ if (this.disabled) {
+ super.preDelete(e, delete, edit, writeToWAL);
+ return;
+ }
+ try {
+ preDeleteWithExceptions(e, delete, edit, writeToWAL);
+ return;
+ } catch (Throwable t) {
+ rethrowIndexingException(t);
+ }
+ throw new RuntimeException(
+ "Somehow didn't return an index update but also didn't propagate the failure to the client!");
+ }
+
+ public void preDeleteWithExceptions(ObserverContext<RegionCoprocessorEnvironment> e,
+ Delete delete, WALEdit edit, boolean writeToWAL) throws Exception {
+ // if we are making the update as part of a batch, we need to add in a batch marker so the WAL
+ // is retained
+ if (this.builder.getBatchId(delete) != null) {
+ edit.add(BATCH_MARKER);
+ return;
+ }
+
+ // get the mapping for index column -> target index table
+ Collection<Pair<Mutation, byte[]>> indexUpdates = this.builder.getIndexUpdate(delete);
+
+ if (doPre(indexUpdates, edit, writeToWAL)) {
+ takeUpdateLock("delete");
+ }
+ }
+
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ if (this.disabled) {
+ super.preBatchMutate(c, miniBatchOp);
+ return;
+ }
+ try {
+ preBatchMutateWithExceptions(c, miniBatchOp);
+ return;
+ } catch (Throwable t) {
+ rethrowIndexingException(t);
+ }
+ throw new RuntimeException(
+ "Somehow didn't return an index update but also didn't propagate the failure to the client!");
+ }
+
+ @SuppressWarnings("deprecation")
+ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws Throwable {
+
+ // first group all the updates for a single row into a single update to be processed
+ Map<ImmutableBytesPtr, MultiMutation> mutations =
+ new HashMap<ImmutableBytesPtr, MultiMutation>();
+ boolean durable = false;
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ // remove the batch keyvalue marker - its added for all puts
+ WALEdit edit = miniBatchOp.getWalEdit(i);
+ // we don't have a WALEdit for immutable index cases, which still see this path
+ // we could check is indexing is enable for the mutation in prePut and then just skip this
+ // after checking here, but this saves us the checking again.
+ if (edit != null) {
+ KeyValue kv = edit.getKeyValues().remove(0);
+ assert kv == BATCH_MARKER : "Expected batch marker from the WALEdit, but got: " + kv;
+ }
+ Pair<Mutation, Integer> op = miniBatchOp.getOperation(i);
+ Mutation m = op.getFirst();
+ // skip this mutation if we aren't enabling indexing
+ // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
+ // should be indexed, which means we need to expose another method on the builder. Such is the
+ // way optimization go though.
+ if (!this.builder.isEnabled(m)) {
+ continue;
+ }
+
+ // figure out if this is batch is durable or not
+ if(!durable){
+ durable = m.getDurability() != Durability.SKIP_WAL;
+ }
+
+ // add the mutation to the batch set
+ ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+ MultiMutation stored = mutations.get(row);
+ // we haven't seen this row before, so add it
+ if (stored == null) {
+ stored = new MultiMutation(row, m.getWriteToWAL());
+ mutations.put(row, stored);
+ }
+ stored.addAll(m);
+ }
+
+ // early exit if it turns out we don't have any edits
+ if (mutations.entrySet().size() == 0) {
+ return;
+ }
+
+ // dump all the index updates into a single WAL. They will get combined in the end anyways, so
+ // don't worry which one we get
+ WALEdit edit = miniBatchOp.getWalEdit(0);
+
+ // get the index updates for all elements in this batch
+ Collection<Pair<Mutation, byte[]>> indexUpdates =
+ this.builder.getIndexUpdate(miniBatchOp, mutations.values());
+ // write them
+ if (doPre(indexUpdates, edit, durable)) {
+ takeUpdateLock("batch mutation");
+ }
+ }
+
+ private void takeUpdateLock(String opDesc) throws IndexBuildingFailureException {
+ boolean interrupted = false;
+ // lock the log, so we are sure that index write gets atomically committed
+ LOG.debug("Taking INDEX_UPDATE readlock for " + opDesc);
+ // wait for the update lock
+ while (!this.stopped) {
+ try {
+ INDEX_UPDATE_LOCK.lockInterruptibly();
+ LOG.debug("Got the INDEX_UPDATE readlock for " + opDesc);
+ // unlock the lock so the server can shutdown, if we find that we have stopped since getting
+ // the lock
+ if (this.stopped) {
+ INDEX_UPDATE_LOCK.unlock();
+ throw new IndexBuildingFailureException(
+ "Found server stop after obtaining the update lock, killing update attempt");
+ }
+ break;
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while waiting for update lock. Ignoring unless stopped");
+ interrupted = true;
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private class MultiMutation extends Mutation {
+
+ private ImmutableBytesPtr rowKey;
+
+ public MultiMutation(ImmutableBytesPtr rowkey, boolean writeToWal) {
+ this.rowKey = rowkey;
+ this.writeToWAL = writeToWal;
+ }
+
+ /**
+ * @param stored
+ */
+ @SuppressWarnings("deprecation")
+ public void addAll(Mutation stored) {
+ // add all the kvs
+ for (Entry<byte[], List<KeyValue>> kvs : stored.getFamilyMap().entrySet()) {
+ byte[] family = kvs.getKey();
+ List<KeyValue> list = getKeyValueList(family, kvs.getValue().size());
+ list.addAll(kvs.getValue());
+ familyMap.put(family, list);
+ }
+
+ // add all the attributes, not overriding already stored ones
+ for (Entry<String, byte[]> attrib : stored.getAttributesMap().entrySet()) {
+ if (this.getAttribute(attrib.getKey()) == null) {
+ this.setAttribute(attrib.getKey(), attrib.getValue());
+ }
+ }
+ if (stored.getWriteToWAL()) {
+ this.writeToWAL = true;
+ }
+ }
+
+ private List<KeyValue> getKeyValueList(byte[] family, int hint) {
+ List<KeyValue> list = familyMap.get(family);
+ if (list == null) {
+ list = new ArrayList<KeyValue>(hint);
+ }
+ return list;
+ }
+
+ @Override
+ public byte[] getRow(){
+ return this.rowKey.copyBytesIfNecessary();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.rowKey.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o == null ? false : o.hashCode() == this.hashCode();
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ throw new UnsupportedOperationException("MultiMutations cannot be read/written");
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ throw new UnsupportedOperationException("MultiMutations cannot be read/written");
+ }
+ }
+
+ /**
+ * Add the index updates to the WAL, or write to the index table, if the WAL has been disabled
+ * @return <tt>true</tt> if the WAL has been updated.
+ * @throws IOException
+ */
+ private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, final WALEdit edit,
+ final boolean writeToWAL) throws IOException {
+ // no index updates, so we are done
+ if (indexUpdates == null || indexUpdates.size() == 0) {
+ return false;
+ }
+
+ // if writing to wal is disabled, we never see the WALEdit updates down the way, so do the index
+ // update right away
+ if (!writeToWAL) {
+ try {
+ this.writer.write(indexUpdates);
+ return false;
+ } catch (Throwable e) {
+ LOG.error("Failed to update index with entries:" + indexUpdates, e);
+ IndexManagementUtil.rethrowIndexingException(e);
+ }
+ }
+
+ // we have all the WAL durability, so we just update the WAL entry and move on
+ for (Pair<Mutation, byte[]> entry : indexUpdates) {
+ edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));
+ }
+
+ return true;
+ }
+
+ @Override
+ public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
+ boolean writeToWAL) throws IOException {
+ if (this.disabled) {
+ super.postPut(e, put, edit, writeToWAL);
+ return;
+ }
+ doPost(edit, put, writeToWAL);
+ }
+
+ @Override
+ public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
+ WALEdit edit, boolean writeToWAL) throws IOException {
+ if (this.disabled) {
+ super.postDelete(e, delete, edit, writeToWAL);
+ return;
+ }
+ doPost(edit,delete, writeToWAL);
+ }
+
+ @Override
+ public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ if (this.disabled) {
+ super.postBatchMutate(c, miniBatchOp);
+ return;
+ }
+ this.builder.batchCompleted(miniBatchOp);
+ // noop for the rest of the indexer - its handled by the first call to put/delete
+ }
+
+ private void doPost(WALEdit edit, Mutation m, boolean writeToWAL) throws IOException {
+ try {
+ doPostWithExceptions(edit, m, writeToWAL);
+ return;
+ } catch (Throwable e) {
+ rethrowIndexingException(e);
+ }
+ throw new RuntimeException(
+ "Somehow didn't complete the index update, but didn't return succesfully either!");
+ }
+
+ private void doPostWithExceptions(WALEdit edit, Mutation m, boolean writeToWAL) throws Exception {
+ //short circuit, if we don't need to do any work
+ if (!writeToWAL || !this.builder.isEnabled(m)) {
+ // already did the index update in prePut, so we are done
+ return;
+ }
+
+ // there is a little bit of excess here- we iterate all the non-indexed kvs for this check first
+ // and then do it again later when getting out the index updates. This should be pretty minor
+ // though, compared to the rest of the runtime
+ IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);
+ /*
+ * early exit - we have nothing to write, so we don't need to do anything else. NOTE: we don't
+ * release the WAL Rolling lock (INDEX_UPDATE_LOCK) since we never take it in doPre if there are
+ * no index updates.
+ */
+ if (ikv == null) {
+ return;
+ }
+
+ /*
+ * only write the update if we haven't already seen this batch. We only want to write the batch
+ * once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can
+ * lead to writing all the index updates for each Put/Delete).
+ */
+ if (!ikv.getBatchFinished()) {
+ Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit);
+
+ // the WAL edit is kept in memory and we already specified the factory when we created the
+ // references originally - therefore, we just pass in a null factory here and use the ones
+ // already specified on each reference
+ try {
+ writer.writeAndKillYourselfOnFailure(indexUpdates);
+ } finally {
+ // With a custom kill policy, we may throw instead of kill the server.
+ // Without doing this in a finally block (at least with the mini cluster),
+ // the region server never goes down.
+
+ // mark the batch as having been written. In the single-update case, this never gets check
+ // again, but in the batch case, we will check it again (see above).
+ ikv.markBatchFinished();
+
+ // release the lock on the index, we wrote everything properly
+ // we took the lock for each Put/Delete, so we have to release it a matching number of times
+ // batch cases only take the lock once, so we need to make sure we don't over-release the
+ // lock.
+ LOG.debug("Releasing INDEX_UPDATE readlock");
+ INDEX_UPDATE_LOCK.unlock();
+ }
+ }
+ }
+
+ /**
+ * Search the {@link WALEdit} for the first {@link IndexedKeyValue} present
+ * @param edit {@link WALEdit}
+ * @return the first {@link IndexedKeyValue} in the {@link WALEdit} or <tt>null</tt> if not
+ * present
+ */
+ private IndexedKeyValue getFirstIndexedKeyValue(WALEdit edit) {
+ for (KeyValue kv : edit.getKeyValues()) {
+ if (kv instanceof IndexedKeyValue) {
+ return (IndexedKeyValue) kv;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Extract the index updates from the WAL Edit
+ * @param edit to search for index updates
+ * @return the mutations to apply to the index tables
+ */
+ private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
+ Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
+ for (KeyValue kv : edit.getKeyValues()) {
+ if (kv instanceof IndexedKeyValue) {
+ IndexedKeyValue ikv = (IndexedKeyValue) kv;
+ indexUpdates.add(new Pair<Mutation, byte[]>(ikv.getMutation(), ikv.getIndexTable()));
+ }
+ }
+
+ return indexUpdates;
+ }
+
+ @Override
+ public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c) {
+ Multimap<HTableInterfaceReference, Mutation> updates = failedIndexEdits.getEdits(c.getEnvironment().getRegion());
+
+ if (this.disabled) {
+ super.postOpen(c);
+ return;
+ }
+ LOG.info("Found some outstanding index updates that didn't succeed during"
+ + " WAL replay - attempting to replay now.");
+ //if we have no pending edits to complete, then we are done
+ if (updates == null || updates.size() == 0) {
+ return;
+ }
+
+ // do the usual writer stuff, killing the server again, if we can't manage to make the index
+ // writes succeed again
+ try {
+ writer.writeAndKillYourselfOnFailure(updates);
+ } catch (IOException e) {
+ LOG.error("Exception thrown instead of killing server during index writing", e);
+ }
+ }
+
+ @Override
+ public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
+ HLogKey logKey, WALEdit logEdit) throws IOException {
+ if (this.disabled) {
+ super.preWALRestore(env, info, logKey, logEdit);
+ return;
+ }
+ // TODO check the regions in transition. If the server on which the region lives is this one,
+ // then we should rety that write later in postOpen.
+ // we might be able to get even smarter here and pre-split the edits that are server-local
+ // into their own recovered.edits file. This then lets us do a straightforward recovery of each
+ // region (and more efficiently as we aren't writing quite as hectically from this one place).
+
+ /*
+ * Basically, we let the index regions recover for a little while long before retrying in the
+ * hopes they come up before the primary table finishes.
+ */
+ Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
+ recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates);
+ }
+
+ /**
+ * Create a custom {@link InternalScanner} for a compaction that tracks the versions of rows that
+ * are removed so we can clean then up from the the index table(s).
+ * <p>
+ * This is not yet implemented - its not clear if we should even mess around with the Index table
+ * for these rows as those points still existed. TODO: v2 of indexing
+ */
+ @Override
+ public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
+ InternalScanner s) throws IOException {
+ return super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s);
+ }
+
+ /**
+ * Exposed for testing!
+ * @return the currently instantiated index builder
+ */
+ public IndexBuilder getBuilderForTesting() {
+ return this.builder.getBuilderForTesting();
+ }
+
+ /**
+ * Validate that the version and configuration parameters are supported
+ * @param hBaseVersion current version of HBase on which <tt>this</tt> coprocessor is installed
+ * @param conf configuration to check for allowed parameters (e.g. WAL Compression only if >=
+ * 0.94.9)
+ */
+ public static String validateVersion(String hBaseVersion, Configuration conf) {
+ String[] versions = hBaseVersion.split("[.]");
+ if (versions.length < 3) {
+ return "HBase version could not be read, expected three parts, but found: "
+ + Arrays.toString(versions);
+ }
+
+ if (versions[1].equals("94")) {
+ String pointVersion = versions[2];
+ //remove -SNAPSHOT if applicable
+ int snapshot = pointVersion.indexOf("-");
+ if(snapshot > 0){
+ pointVersion = pointVersion.substring(0, snapshot);
+ }
+ // less than 0.94.9, so we need to check if WAL Compression is enabled
+ if (Integer.parseInt(pointVersion) < 9) {
+ if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) {
+ return
+ "Indexing not supported with WAL Compression for versions of HBase older than 0.94.9 - found version:"
+ + Arrays.toString(versions);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Enable indexing on the given table
+ * @param desc {@link HTableDescriptor} for the table on which indexing should be enabled
+ * @param builder class to use when building the index for this table
+ * @param properties map of custom configuration options to make available to your
+ * {@link IndexBuilder} on the server-side
+ * @throws IOException the Indexer coprocessor cannot be added
+ */
+ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBuilder> builder,
+ Map<String, String> properties) throws IOException {
+ if (properties == null) {
+ properties = new HashMap<String, String>();
+ }
+ properties.put(Indexer.INDEX_BUILDER_CONF_KEY, builder.getName());
+ desc.addCoprocessor(Indexer.class.getName(), null, Coprocessor.PRIORITY_USER, properties);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/ValueGetter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/ValueGetter.java b/src/main/java/org/apache/hbase/index/ValueGetter.java
new file mode 100644
index 0000000..2600bf8
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/ValueGetter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index;
+
+import java.io.IOException;
+
+import org.apache.hbase.index.covered.update.ColumnReference;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+public interface ValueGetter {
+
+ /**
+ * Get the most recent (largest timestamp) for the given column reference
+ * @param ref to match against an underlying key value. Uses the passed object to match the
+ * keyValue via {@link ColumnReference#matches}
+ * @return the stored value for the given {@link ColumnReference}, or <tt>null</tt> if no value is
+ * present.
+ * @throws IOException if there is an error accessing the underlying data storage
+ */
+ public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/builder/BaseIndexBuilder.java b/src/main/java/org/apache/hbase/index/builder/BaseIndexBuilder.java
new file mode 100644
index 0000000..d0a8624
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/builder/BaseIndexBuilder.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.builder;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.hbase.index.covered.CoveredColumnsIndexBuilder;
+
+/**
+ * Basic implementation of the {@link IndexBuilder} that doesn't do any actual work of indexing.
+ * <p>
+ * You should extend this class, rather than implementing IndexBuilder directly to maintain
+ * compatability going forward.
+ * <p>
+ * Generally, you should consider using one of the implemented IndexBuilders (e.g
+ * {@link CoveredColumnsIndexBuilder}) as there is a lot of work required to keep an index table
+ * up-to-date.
+ */
+public abstract class BaseIndexBuilder implements IndexBuilder {
+
+ private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class);
+ protected boolean stopped;
+
+ @Override
+ public void extendBaseIndexBuilderInstead() { }
+
+ @Override
+ public void setup(RegionCoprocessorEnvironment conf) throws IOException {
+ // noop
+ }
+
+ @Override
+ public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ // noop
+ }
+
+ @Override
+ public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) {
+ // noop
+ }
+
+ /**
+ * By default, we always attempt to index the mutation. Commonly this can be slow (because the
+ * framework spends the time to do the indexing, only to realize that you don't need it) or not
+ * ideal (if you want to turn on/off indexing on a table without completely reloading it).
+ * @throws IOException
+ */
+ @Override
+ public boolean isEnabled(Mutation m) throws IOException {
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each
+ * mutation always applies to different rows, even if they are in the same batch, or are
+ * independent updates.
+ */
+ @Override
+ public byte[] getBatchId(Mutation m) {
+ return null;
+ }
+
+ @Override
+ public void stop(String why) {
+ LOG.debug("Stopping because: " + why);
+ this.stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/builder/IndexBuildManager.java b/src/main/java/org/apache/hbase/index/builder/IndexBuildManager.java
new file mode 100644
index 0000000..c5ebafe
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/builder/IndexBuildManager.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.builder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.hbase.index.Indexer;
+import org.apache.hbase.index.parallel.QuickFailingTaskRunner;
+import org.apache.hbase.index.parallel.Task;
+import org.apache.hbase.index.parallel.TaskBatch;
+import org.apache.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.hbase.index.parallel.ThreadPoolManager;
+
+/**
+ * Manage the building of index updates from primary table updates.
+ * <p>
+ * Internally, parallelizes updates through a thread-pool to a delegate index builder. Underlying
+ * {@link IndexBuilder} <b>must be thread safe</b> for each index update.
+ */
+public class IndexBuildManager implements Stoppable {
+
+ private static final Log LOG = LogFactory.getLog(IndexBuildManager.class);
+ private final IndexBuilder delegate;
+ private QuickFailingTaskRunner pool;
+ private boolean stopped;
+
+ /**
+ * Set the number of threads with which we can concurrently build index updates. Unused threads
+ * will be released, but setting the number of threads too high could cause frequent swapping and
+ * resource contention on the server - <i>tune with care</i>. However, if you are spending a lot
+ * of time building index updates, it could be worthwhile to spend the time to tune this parameter
+ * as it could lead to dramatic increases in speed.
+ */
+ public static final String NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY = "index.builder.threads.max";
+ /** Default to a single thread. This is the safest course of action, but the slowest as well */
+ private static final int DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS = 10;
+ /**
+ * Amount of time to keep idle threads in the pool. After this time (seconds) we expire the
+ * threads and will re-create them as needed, up to the configured max
+ */
+ private static final String INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY =
+ "index.builder.threads.keepalivetime";
+
+ /**
+ * @param env environment in which <tt>this</tt> is running. Used to setup the
+ * {@link IndexBuilder} and executor
+ * @throws IOException if an {@link IndexBuilder} cannot be correctly steup
+ */
+ public IndexBuildManager(RegionCoprocessorEnvironment env) throws IOException {
+ this(getIndexBuilder(env), new QuickFailingTaskRunner(ThreadPoolManager.getExecutor(
+ getPoolBuilder(env), env)));
+ }
+
+ private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
+ Configuration conf = e.getConfiguration();
+ Class<? extends IndexBuilder> builderClass =
+ conf.getClass(Indexer.INDEX_BUILDER_CONF_KEY, null, IndexBuilder.class);
+ try {
+ IndexBuilder builder = builderClass.newInstance();
+ builder.setup(e);
+ return builder;
+ } catch (InstantiationException e1) {
+ throw new IOException("Couldn't instantiate index builder:" + builderClass
+ + ", disabling indexing on table " + e.getRegion().getTableDesc().getNameAsString());
+ } catch (IllegalAccessException e1) {
+ throw new IOException("Couldn't instantiate index builder:" + builderClass
+ + ", disabling indexing on table " + e.getRegion().getTableDesc().getNameAsString());
+ }
+ }
+
+ private static ThreadPoolBuilder getPoolBuilder(RegionCoprocessorEnvironment env) {
+ String serverName = env.getRegionServerServices().getServerName().getServerName();
+ return new ThreadPoolBuilder(serverName + "-index-builder", env.getConfiguration()).
+ setCoreTimeout(INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY).
+ setMaxThread(NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS);
+ }
+
+ public IndexBuildManager(IndexBuilder builder, QuickFailingTaskRunner pool) {
+ this.delegate = builder;
+ this.pool = pool;
+ }
+
+
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
+ MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp,
+ Collection<? extends Mutation> mutations) throws Throwable {
+ // notify the delegate that we have started processing a batch
+ this.delegate.batchStarted(miniBatchOp);
+
+ // parallelize each mutation into its own task
+ // each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would
+ // fail lookups/scanning) and (2) by stopping this via the #stop method. Interrupts will only be
+ // acknowledged on each thread before doing the actual lookup, but after that depends on the
+ // underlying builder to look for the closed flag.
+ TaskBatch<Collection<Pair<Mutation, byte[]>>> tasks =
+ new TaskBatch<Collection<Pair<Mutation, byte[]>>>(mutations.size());
+ for (final Mutation m : mutations) {
+ tasks.add(new Task<Collection<Pair<Mutation, byte[]>>>() {
+
+ @Override
+ public Collection<Pair<Mutation, byte[]>> call() throws IOException {
+ return delegate.getIndexUpdate(m);
+ }
+
+ });
+ }
+ List<Collection<Pair<Mutation, byte[]>>> allResults = null;
+ try {
+ allResults = pool.submitUninterruptible(tasks);
+ } catch (CancellationException e) {
+ throw e;
+ } catch (ExecutionException e) {
+ LOG.error("Found a failed index update!");
+ throw e.getCause();
+ }
+
+ // we can only get here if we get successes from each of the tasks, so each of these must have a
+ // correct result
+ Collection<Pair<Mutation, byte[]>> results = new ArrayList<Pair<Mutation, byte[]>>();
+ for (Collection<Pair<Mutation, byte[]>> result : allResults) {
+ assert result != null : "Found an unsuccessful result, but didn't propagate a failure earlier";
+ results.addAll(result);
+ }
+
+ return results;
+ }
+
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException {
+ // all we get is a single update, so it would probably just go slower if we needed to queue it
+ // up. It will increase underlying resource contention a little bit, but the mutation case is
+ // far more common, so let's not worry about it for now.
+ // short circuit so we don't waste time.
+ if (!this.delegate.isEnabled(delete)) {
+ return null;
+ }
+
+ return delegate.getIndexUpdate(delete);
+
+ }
+
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
+ Collection<KeyValue> filtered) throws IOException {
+ // this is run async, so we can take our time here
+ return delegate.getIndexUpdateForFilteredRows(filtered);
+ }
+
+ public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) {
+ delegate.batchCompleted(miniBatchOp);
+ }
+
+ public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp)
+ throws IOException {
+ delegate.batchStarted(miniBatchOp);
+ }
+
+ public boolean isEnabled(Mutation m) throws IOException {
+ return delegate.isEnabled(m);
+ }
+
+ public byte[] getBatchId(Mutation m) {
+ return delegate.getBatchId(m);
+ }
+
+ @Override
+ public void stop(String why) {
+ if (stopped) {
+ return;
+ }
+ this.stopped = true;
+ this.delegate.stop(why);
+ this.pool.stop(why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+ public IndexBuilder getBuilderForTesting() {
+ return this.delegate;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/builder/IndexBuilder.java b/src/main/java/org/apache/hbase/index/builder/IndexBuilder.java
new file mode 100644
index 0000000..b5ad1db
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/builder/IndexBuilder.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.builder;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.hbase.index.Indexer;
+
+/**
+ * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
+ * updates.
+ * <p>
+ * Either all the index updates will be applied to all tables or the primary table will kill itself
+ * and will attempt to replay the index edits through the WAL replay mechanism.
+ */
+public interface IndexBuilder extends Stoppable {
+
+ /** Helper method signature to ensure people don't attempt to extend this class directly */
+ public void extendBaseIndexBuilderInstead();
+
+ /**
+ * This is always called exactly once on install of {@link Indexer}, before any calls
+ * {@link #getIndexUpdate} on
+ * @param env in which the builder is running
+ * @throws IOException on failure to setup the builder
+ */
+ public void setup(RegionCoprocessorEnvironment env) throws IOException;
+
+ /**
+ * Your opportunity to update any/all index tables based on the update of the primary table row.
+ * Its up to your implementation to ensure that timestamps match between the primary and index
+ * tables.
+ * <p>
+ * The mutation is a generic mutation (not a {@link Put} or a {@link Delete}), as it actually
+ * corresponds to a batch update. Its important to note that {@link Put}s always go through the
+ * batch update code path, so a single {@link Put} will come through here and update the primary
+ * table as the only update in the mutation.
+ * <p>
+ * Implementers must ensure that this method is thread-safe - it could (and probably will) be
+ * called concurrently for different mutations, which may or may not be part of the same batch.
+ * @param mutation update to the primary table to be indexed.
+ * @return a Map of the mutations to make -> target index table name
+ * @throws IOException on failure
+ */
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException;
+
+ /**
+ * The counter-part to {@link #getIndexUpdate(Mutation)} - your opportunity to update any/all
+ * index tables based on the delete of the primary table row. This is only called for cases where
+ * the client sends a single delete ({@link HTable#delete}). We separate this method from
+ * {@link #getIndexUpdate(Mutation)} only for the ease of implementation as the delete path has
+ * subtly different semantics for updating the families/timestamps from the generic batch path.
+ * <p>
+ * Its up to your implementation to ensure that timestamps match between the primary and index
+ * tables.
+ * <p>
+ * Implementers must ensure that this method is thread-safe - it could (and probably will) be
+ * called concurrently for different mutations, which may or may not be part of the same batch.
+ * @param delete {@link Delete} to the primary table that may be indexed
+ * @return a {@link Map} of the mutations to make -> target index table name
+ * @throws IOException on failure
+ */
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException;
+
+ /**
+ * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal
+ * flush or compaction mechanisms.
+ * @param filtered {@link KeyValue}s that previously existed, but won't be included in further
+ * output from HBase.
+ * @return a {@link Map} of the mutations to make -> target index table name
+ * @throws IOException on failure
+ */
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
+ Collection<KeyValue> filtered)
+ throws IOException;
+
+ /**
+ * Notification that a batch of updates has successfully been written.
+ * @param miniBatchOp the full batch operation that was written
+ */
+ public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp);
+
+ /**
+ * Notification that a batch has been started.
+ * <p>
+ * Unfortunately, the way HBase has the coprocessor hooks setup, this is actually called
+ * <i>after</i> the {@link #getIndexUpdate} methods. Therefore, you will likely need an attribute
+ * on your {@link Put}/{@link Delete} to indicate it is a batch operation.
+ * @param miniBatchOp the full batch operation to be written
+ * @throws IOException
+ */
+ public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
+
+ /**
+ * This allows the codec to dynamically change whether or not indexing should take place for a
+ * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making
+ * it dynamic, we can save offlining and then onlining a table just to turn indexing on.
+ * <p>
+ * We can also be smart about even indexing a given update here too - if the update doesn't
+ * contain any columns that we care about indexing, we can save the effort of analyzing the put
+ * and further.
+ * @param m mutation that should be indexed.
+ * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table
+ * basis, as each codec is instantiated per-region.
+ * @throws IOException
+ */
+ public boolean isEnabled(Mutation m) throws IOException;
+
+ /**
+ * @param m mutation that has been received by the indexer and is waiting to be indexed
+ * @return the ID of batch to which the Mutation belongs, or <tt>null</tt> if the mutation is not
+ * part of a batch.
+ */
+ public byte[] getBatchId(Mutation m);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/builder/IndexBuildingFailureException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/builder/IndexBuildingFailureException.java b/src/main/java/org/apache/hbase/index/builder/IndexBuildingFailureException.java
new file mode 100644
index 0000000..5fd40a8
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/builder/IndexBuildingFailureException.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.builder;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+
+/**
+ * Unexpected failure while building index updates that wasn't caused by an {@link IOException}.
+ * This should be used if there is some basic issue with indexing - and no matter of retries will
+ * fix it.
+ */
+@SuppressWarnings("serial")
+public class IndexBuildingFailureException extends DoNotRetryIOException {
+
+ /**
+ * Constructor for over the wire propagation. Generally, shouldn't be used since index failure
+ * should have an underlying cause to propagate.
+ * @param msg reason for the failure
+ */
+ public IndexBuildingFailureException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * @param msg reason
+ * @param cause underlying cause for the failure
+ */
+ public IndexBuildingFailureException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/Batch.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/Batch.java b/src/main/java/org/apache/hbase/index/covered/Batch.java
new file mode 100644
index 0000000..a89fc49
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/Batch.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * A collection of {@link KeyValue KeyValues} to the primary table
+ */
+public class Batch {
+
+ private static final long pointDeleteCode = KeyValue.Type.Delete.getCode();
+ private final long timestamp;
+ private List<KeyValue> batch = new ArrayList<KeyValue>();
+ private boolean allPointDeletes = true;
+
+ /**
+ * @param ts
+ */
+ public Batch(long ts) {
+ this.timestamp = ts;
+ }
+
+ public void add(KeyValue kv){
+ if (pointDeleteCode != kv.getType()) {
+ allPointDeletes = false;
+ }
+ batch.add(kv);
+ }
+
+ public boolean isAllPointDeletes() {
+ return allPointDeletes;
+ }
+
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+
+ public List<KeyValue> getKvs() {
+ return this.batch;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/CoveredColumns.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/CoveredColumns.java b/src/main/java/org/apache/hbase/index/covered/CoveredColumns.java
new file mode 100644
index 0000000..4c33391
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/CoveredColumns.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hbase.index.covered.update.ColumnReference;
+
+/**
+ * Manage a set of {@link ColumnReference}s for the {@link LocalTableState}.
+ */
+public class CoveredColumns {
+
+ Set<ColumnReference> columns = new HashSet<ColumnReference>();
+
+ public Collection<? extends ColumnReference> findNonCoveredColumns(
+ Collection<? extends ColumnReference> columns2) {
+ List<ColumnReference> uncovered = new ArrayList<ColumnReference>();
+ for (ColumnReference column : columns2) {
+ if (!columns.contains(column)) {
+ uncovered.add(column);
+ }
+ }
+ return uncovered;
+ }
+
+ public void addColumn(ColumnReference column) {
+ this.columns.add(column);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/CoveredColumnsIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/CoveredColumnsIndexBuilder.java b/src/main/java/org/apache/hbase/index/covered/CoveredColumnsIndexBuilder.java
new file mode 100644
index 0000000..9a3e4d3
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/CoveredColumnsIndexBuilder.java
@@ -0,0 +1,490 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import org.apache.hbase.index.builder.BaseIndexBuilder;
+import org.apache.hbase.index.covered.data.LocalHBaseState;
+import org.apache.hbase.index.covered.data.LocalTable;
+import org.apache.hbase.index.covered.update.ColumnTracker;
+import org.apache.hbase.index.covered.update.IndexUpdateManager;
+import org.apache.hbase.index.covered.update.IndexedColumnGroup;
+
+/**
+ * Build covered indexes for phoenix updates.
+ * <p>
+ * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't
+ * need to do any extra synchronization in the IndexBuilder.
+ * <p>
+ * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or
+ * flush, leading to a bloated index that needs to be cleaned up by a background process.
+ */
+public class CoveredColumnsIndexBuilder extends BaseIndexBuilder {
+
+ private static final Log LOG = LogFactory.getLog(CoveredColumnsIndexBuilder.class);
+ public static final String CODEC_CLASS_NAME_KEY = "org.apache.hbase.index.codec.class";
+
+ protected RegionCoprocessorEnvironment env;
+ protected IndexCodec codec;
+ protected LocalHBaseState localTable;
+
+ @Override
+ public void setup(RegionCoprocessorEnvironment env) throws IOException {
+ this.env = env;
+ // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here
+ // so we can use it later when generalizing covered indexes
+ Configuration conf = env.getConfiguration();
+ Class<? extends IndexCodec> codecClass =
+ conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class);
+ try {
+ Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
+ meth.setAccessible(true);
+ this.codec = meth.newInstance();
+ this.codec.initialize(env);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ this.localTable = new LocalTable(env);
+ }
+
+ @Override
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
+ // build the index updates for each group
+ IndexUpdateManager updateMap = new IndexUpdateManager();
+
+ batchMutationAndAddUpdates(updateMap, mutation);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found index updates for Mutation: " + mutation + "\n" + updateMap);
+ }
+
+ return updateMap.toMap();
+ }
+
+ /**
+ * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each
+ * key-value in the update to see if it matches the others. Generally, this will be the case, but
+ * you can add kvs to a mutation that don't all have the timestamp, so we need to manage
+ * everything in batches based on timestamp.
+ * <p>
+ * Adds all the updates in the {@link Mutation} to the state, as a side-effect.
+ * @param updateMap index updates into which to add new updates. Modified as a side-effect.
+ * @param state current state of the row for the mutation.
+ * @param m mutation to batch
+ * @throws IOException
+ */
+ private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException {
+ // split the mutation into timestamp-based batches
+ Collection<Batch> batches = createTimestampBatchesFromMutation(m);
+
+ // create a state manager, so we can manage each batch
+ LocalTableState state = new LocalTableState(env, localTable, m);
+
+ // go through each batch of keyvalues and build separate index entries for each
+ boolean cleanupCurrentState = true;
+ for (Batch batch : batches) {
+ /*
+ * We have to split the work between the cleanup and the update for each group because when we
+ * update the current state of the row for the current batch (appending the mutations for the
+ * current batch) the next group will see that as the current state, which will can cause the
+ * a delete and a put to be created for the next group.
+ */
+ if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
+ cleanupCurrentState = false;
+ }
+ }
+ }
+
+ /**
+ * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any
+ * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
+ * the time the method is called.
+ * @param m {@link Mutation} from which to extract the {@link KeyValue}s
+ * @return the mutation, broken into batches and sorted in ascending order (smallest first)
+ */
+ protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
+ Map<Long, Batch> batches = new HashMap<Long, Batch>();
+ for (List<KeyValue> family : m.getFamilyMap().values()) {
+ createTimestampBatchesFromKeyValues(family, batches);
+ }
+ // sort the batches
+ List<Batch> sorted = new ArrayList<Batch>(batches.values());
+ Collections.sort(sorted, new Comparator<Batch>() {
+ @Override
+ public int compare(Batch o1, Batch o2) {
+ return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
+ }
+ });
+ return sorted;
+ }
+
+ /**
+ * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any
+ * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
+ * the time the method is called.
+ * @param kvs {@link KeyValue}s to break into batches
+ * @param batches to update with the given kvs
+ */
+ protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs,
+ Map<Long, Batch> batches) {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ byte[] nowBytes = Bytes.toBytes(now);
+
+ // batch kvs by timestamp
+ for (KeyValue kv : kvs) {
+ long ts = kv.getTimestamp();
+ // override the timestamp to the current time, so the index and primary tables match
+ // all the keys with LATEST_TIMESTAMP will then be put into the same batch
+ if (kv.updateLatestStamp(nowBytes)) {
+ ts = now;
+ }
+ Batch batch = batches.get(ts);
+ if (batch == null) {
+ batch = new Batch(ts);
+ batches.put(ts, batch);
+ }
+ batch.add(kv);
+ }
+ }
+
+ /**
+ * For a single batch, get all the index updates and add them to the updateMap
+ * <p>
+ * This method manages cleaning up the entire history of the row from the given timestamp forward
+ * for out-of-order (e.g. 'back in time') updates.
+ * <p>
+ * If things arrive out of order (client is using custom timestamps) we should still see the index
+ * in the correct order (assuming we scan after the out-of-order update in finished). Therefore,
+ * we when we aren't the most recent update to the index, we need to delete the state at the
+ * current timestamp (similar to above), but also issue a delete for the added index updates at
+ * the next newest timestamp of any of the columns in the update; we need to cleanup the insert so
+ * it looks like it was also deleted at that next newest timestamp. However, its not enough to
+ * just update the one in front of us - that column will likely be applied to index entries up the
+ * entire history in front of us, which also needs to be fixed up.
+ * <p>
+ * However, the current update usually will be the most recent thing to be added. In that case,
+ * all we need to is issue a delete for the previous index row (the state of the row, without the
+ * update applied) at the current timestamp. This gets rid of anything currently in the index for
+ * the current state of the row (at the timestamp). Then we can just follow that by applying the
+ * pending update and building the index update based on the new row state.
+ * @param updateMap map to update with new index elements
+ * @param batch timestamp-based batch of edits
+ * @param state local state to update and pass to the codec
+ * @param requireCurrentStateCleanup <tt>true</tt> if we should should attempt to cleanup the
+ * current state of the table, in the event of a 'back in time' batch. <tt>false</tt>
+ * indicates we should not attempt the cleanup, e.g. an earlier batch already did the
+ * cleanup.
+ * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put),
+ * <tt>false</tt> otherwise
+ * @throws IOException
+ */
+ private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch,
+ LocalTableState state, boolean requireCurrentStateCleanup) throws IOException {
+
+ // need a temporary manager for the current batch. It should resolve any conflicts for the
+ // current batch. Essentially, we can get the case where a batch doesn't change the current
+ // state of the index (all Puts are covered by deletes), in which case we don't want to add
+ // anything
+ // A. Get the correct values for the pending state in the batch
+ // A.1 start by cleaning up the current state - as long as there are key-values in the batch
+ // that are indexed, we need to change the current state of the index. Its up to the codec to
+ // determine if we need to make any cleanup given the pending update.
+ long batchTs = batch.getTimestamp();
+ state.setPendingUpdates(batch.getKvs());
+ addCleanupForCurrentBatch(updateMap, batchTs, state);
+
+ // A.2 do a single pass first for the updates to the current state
+ state.applyPendingUpdates();
+ long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
+ // if all the updates are the latest thing in the index, we are done - don't go and fix history
+ if (ColumnTracker.isNewestTime(minTs)) {
+ return false;
+ }
+
+ // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
+ // index. after this, we have the correct view of the index, from the batch up to the index
+ while(!ColumnTracker.isNewestTime(minTs) ){
+ minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
+ }
+
+ // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
+ if (requireCurrentStateCleanup) {
+ // roll back the pending update. This is needed so we can remove all the 'old' index entries.
+ // We don't need to do the puts here, but just the deletes at the given timestamps since we
+ // just want to completely hide the incorrect entries.
+ state.rollback(batch.getKvs());
+ // setup state
+ state.setPendingUpdates(batch.getKvs());
+
+ // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
+ // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
+ // because the update may have a different set of columns or value based on the update).
+ cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
+
+ // have to roll the state forward again, so the current state is correct
+ state.applyPendingUpdates();
+ return true;
+ }
+ return false;
+ }
+
+ private long addUpdateForGivenTimestamp(long ts, LocalTableState state,
+ IndexUpdateManager updateMap) throws IOException {
+ state.setCurrentTimestamp(ts);
+ ts = addCurrentStateMutationsForBatch(updateMap, state);
+ return ts;
+ }
+
+ private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs,
+ LocalTableState state) throws IOException {
+ // get the cleanup for the current state
+ state.setCurrentTimestamp(batchTs);
+ addDeleteUpdatesToMap(updateMap, state, batchTs);
+ // ignore any index tracking from the delete
+ state.resetTrackedColumns();
+ }
+
+ /**
+ * Add the necessary mutations for the pending batch on the local state. Handles rolling up
+ * through history to determine the index changes after applying the batch (for the case where the
+ * batch is back in time).
+ * @param updateMap to update with index mutations
+ * @param batch to apply to the current state
+ * @param state current state of the table
+ * @return the minimum timestamp across all index columns requested. If
+ * {@link ColumnTracker#isNewestTime(long)} returns <tt>true</tt> on the returned
+ * timestamp, we know that this <i>was not a back-in-time update</i>.
+ * @throws IOException
+ */
+ private long
+ addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state) throws IOException {
+
+ // get the index updates for this current batch
+ Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
+ state.resetTrackedColumns();
+
+ /*
+ * go through all the pending updates. If we are sure that all the entries are the latest
+ * timestamp, we can just add the index updates and move on. However, if there are columns that
+ * we skip past (based on the timestamp of the batch), we need to roll back up the history.
+ * Regardless of whether or not they are the latest timestamp, the entries here are going to be
+ * correct for the current batch timestamp, so we add them to the updates. The only thing we
+ * really care about it if we need to roll up the history and fix it as we go.
+ */
+ // timestamp of the next update we need to track
+ long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+ List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
+ for (IndexUpdate update : upserts) {
+ // this is the one bit where we check the timestamps
+ final ColumnTracker tracker = update.getIndexedColumns();
+ long trackerTs = tracker.getTS();
+ // update the next min TS we need to track
+ if (trackerTs < minTs) {
+ minTs = tracker.getTS();
+ }
+ // track index hints for the next round. Hint if we need an update for that column for the
+ // next timestamp. These columns clearly won't need to update as we go through time as they
+ // already match the most recent possible thing.
+ boolean needsCleanup = false;
+ if (tracker.hasNewerTimestamps()) {
+ columnHints.add(tracker);
+ // this update also needs to be cleaned up at the next timestamp because it not the latest.
+ needsCleanup = true;
+ }
+
+
+ // only make the put if the index update has been setup
+ if (update.isValid()) {
+ byte[] table = update.getTableName();
+ Mutation mutation = update.getUpdate();
+ updateMap.addIndexUpdate(table, mutation);
+
+ // only make the cleanup if we made a put and need cleanup
+ if (needsCleanup) {
+ // there is a TS for the interested columns that is greater than the columns in the
+ // put. Therefore, we need to issue a delete at the same timestamp
+ Delete d = new Delete(mutation.getRow());
+ d.setTimestamp(tracker.getTS());
+ updateMap.addIndexUpdate(table, d);
+ }
+ }
+ }
+ return minTs;
+ }
+
+ /**
+ * Cleanup the index based on the current state from the given batch. Iterates over each timestamp
+ * (for the indexed rows) for the current state of the table and cleans up all the existing
+ * entries generated by the codec.
+ * <p>
+ * Adds all pending updates to the updateMap
+ * @param updateMap updated with the pending index updates from the codec
+ * @param batchTs timestamp from which we should cleanup
+ * @param state current state of the primary table. Should already by setup to the correct state
+ * from which we want to cleanup.
+ * @throws IOException
+ */
+ private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap,
+ long batchTs, LocalTableState state) throws IOException {
+ // get the cleanup for the current state
+ state.setCurrentTimestamp(batchTs);
+ addDeleteUpdatesToMap(updateMap, state, batchTs);
+ Set<ColumnTracker> trackers = state.getTrackedColumns();
+ long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+ for (ColumnTracker tracker : trackers) {
+ if (tracker.getTS() < minTs) {
+ minTs = tracker.getTS();
+ }
+ }
+ state.resetTrackedColumns();
+ if (!ColumnTracker.isNewestTime(minTs)) {
+ state.setHints(Lists.newArrayList(trackers));
+ cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
+ }
+ }
+
+
+ /**
+ * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then
+ * add them to the update map.
+ * <p>
+ * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates
+ * applied, etc).
+ * @throws IOException
+ */
+ protected void
+ addDeleteUpdatesToMap(IndexUpdateManager updateMap,
+ LocalTableState state, long ts) throws IOException {
+ Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
+ if (cleanup != null) {
+ for (IndexUpdate d : cleanup) {
+ if (!d.isValid()) {
+ continue;
+ }
+ // override the timestamps in the delete to match the current batch.
+ Delete remove = (Delete)d.getUpdate();
+ remove.setTimestamp(ts);
+ updateMap.addIndexUpdate(d.getTableName(), remove);
+ }
+ }
+ }
+
+ @Override
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete d) throws IOException {
+ // stores all the return values
+ IndexUpdateManager updateMap = new IndexUpdateManager();
+
+ // We have to figure out which kind of delete it is, since we need to do different things if its
+ // a general (row) delete, versus a delete of just a single column or family
+ Map<byte[], List<KeyValue>> families = d.getFamilyMap();
+
+ /*
+ * Option 1: its a row delete marker, so we just need to delete the most recent state for each
+ * group, as of the specified timestamp in the delete. This can happen if we have a single row
+ * update and it is part of a batch mutation (prepare doesn't happen until later... maybe a
+ * bug?). In a single delete, this delete gets all the column families appended, so the family
+ * map won't be empty by the time it gets here.
+ */
+ if (families.size() == 0) {
+ LocalTableState state = new LocalTableState(env, localTable, d);
+ // get a consistent view of name
+ long now = d.getTimeStamp();
+ if (now == HConstants.LATEST_TIMESTAMP) {
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ // update the delete's idea of 'now' to be consistent with the index
+ d.setTimestamp(now);
+ }
+ // get deletes from the codec
+ // we only need to get deletes and not add puts because this delete covers all columns
+ addDeleteUpdatesToMap(updateMap, state, now);
+
+ /*
+ * Update the current state for all the kvs in the delete. Generally, we would just iterate
+ * the family map, but since we go here, the family map is empty! Therefore, we need to fake a
+ * bunch of family deletes (just like hos HRegion#prepareDelete works). This is just needed
+ * for current version of HBase that has an issue where the batch update doesn't update the
+ * deletes before calling the hook.
+ */
+ byte[] deleteRow = d.getRow();
+ for (byte[] family : this.env.getRegion().getTableDesc().getFamiliesKeys()) {
+ state.addPendingUpdates(new KeyValue(deleteRow, family, null, now,
+ KeyValue.Type.DeleteFamily));
+ }
+ } else {
+ // Option 2: Its actually a bunch single updates, which can have different timestamps.
+ // Therefore, we need to do something similar to the put case and batch by timestamp
+ batchMutationAndAddUpdates(updateMap, d);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found index updates for Delete: " + d + "\n" + updateMap);
+ }
+
+ return updateMap.toMap();
+ }
+
+ @Override
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
+ Collection<KeyValue> filtered) throws IOException {
+ // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
+ return null;
+ }
+
+ /**
+ * Exposed for testing!
+ * @param codec codec to use for this instance of the builder
+ */
+ public void setIndexCodecForTesting(IndexCodec codec) {
+ this.codec = codec;
+ }
+
+ @Override
+ public boolean isEnabled(Mutation m) throws IOException {
+ // ask the codec to see if we should even attempt indexing
+ return this.codec.isEnabled(m);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/IndexCodec.java b/src/main/java/org/apache/hbase/index/covered/IndexCodec.java
new file mode 100644
index 0000000..267ea33
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/IndexCodec.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import org.apache.phoenix.index.BaseIndexCodec;
+
+
+/**
+ * Codec for creating index updates from the current state of a table.
+ * <p>
+ * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as
+ * features need to be added to the codec, as well as potentially not haivng to implement some
+ * methods.
+ */
+public interface IndexCodec {
+
+ /**
+ * Do any code initialization necessary
+ * @param env environment in which the codec is operating
+ * @throws IOException if the codec cannot be initalized correctly
+ */
+ public void initialize(RegionCoprocessorEnvironment env) throws IOException;
+
+ /**
+ * Get the index cleanup entries. Currently, this must return just single row deletes (where just
+ * the row-key is specified and no columns are returned) mapped to the table name. For instance,
+ * to you have an index 'myIndex' with row :
+ *
+ * <pre>
+ * v1,v2,v3 | CF:CQ0 | rowkey
+ * | CF:CQ1 | rowkey
+ * </pre>
+ *
+ * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'.
+ * @param state the current state of the table that needs to be cleaned up. Generally, you only
+ * care about the latest column values, for each column you are indexing for each index
+ * table.
+ * @return the pairs of (deletes, index table name) that should be applied.
+ * @throws IOException
+ */
+ public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
+
+ // table state has the pending update already applied, before calling
+ // get the new index entries
+ /**
+ * Get the index updates for the primary table state, for each index table. The returned
+ * {@link Put}s need to be fully specified (including timestamp) to minimize passes over the same
+ * key-values multiple times.
+ * <p>
+ * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so
+ * the index entries match the primary table row. This could be managed at a higher level, but
+ * would require iterating all the kvs in the Put again - very inefficient when compared to the
+ * current interface where you must provide a timestamp anyways (so you might as well provide the
+ * right one).
+ * @param state the current state of the table that needs to an index update Generally, you only
+ * care about the latest column values, for each column you are indexing for each index
+ * table.
+ * @return the pairs of (updates,index table name) that should be applied.
+ * @throws IOException
+ */
+ public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
+
+ /**
+ * This allows the codec to dynamically change whether or not indexing should take place for a
+ * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making
+ * it dynamic, we can save offlining and then onlining a table just to turn indexing on.
+ * <p>
+ * We can also be smart about even indexing a given update here too - if the update doesn't
+ * contain any columns that we care about indexing, we can save the effort of analyzing the put
+ * and further.
+ * @param m mutation that should be indexed.
+ * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table
+ * basis, as each codec is instantiated per-region.
+ * @throws IOException
+ */
+ public boolean isEnabled(Mutation m) throws IOException;
+
+ /**
+ * Get the batch identifier of the given mutation. Generally, updates to the table will take place
+ * in a batch of updates; if we know that the mutation is part of a batch, we can build the state
+ * much more intelligently.
+ * <p>
+ * <b>If you have batches that have multiple updates to the same row state, you must specify a
+ * batch id for each batch. Otherwise, we cannot guarantee index correctness</b>
+ * @param m mutation that may or may not be part of the batch
+ * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
+ */
+ public byte[] getBatchId(Mutation m);
+}
\ No newline at end of file