You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:45 UTC
[44/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java b/src/main/java/org/apache/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
new file mode 100644
index 0000000..2fbdc1b
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.FilterBase;
+
+import org.apache.hbase.index.covered.update.ColumnTracker;
+
+/**
+ * Similar to the {@link MaxTimestampFilter}, but also updates the 'next largest' timestamp seen
+ * that is not skipped by the below criteria. Note that it isn't as quick as the
+ * {@link MaxTimestampFilter} as we can't just seek ahead to a key with the matching timestamp, but
+ * have to iterate each kv until we find the right one with an allowed timestamp.
+ * <p>
+ * Inclusively filter on the maximum timestamp allowed. Excludes all elements greater than (but not
+ * equal to) the given timestamp, so given ts = 5, a {@link KeyValue} with ts 6 is excluded, but not
+ * one with ts = 5.
+ * <p>
+ * This filter generally doesn't make sense on its own - it should follow a per-column filter and
+ * possible a per-delete filter to only track the most recent (but not exposed to the user)
+ * timestamp.
+ */
+public class ColumnTrackingNextLargestTimestampFilter extends FilterBase {
+
+ private long ts;
+ private ColumnTracker column;
+
+ public ColumnTrackingNextLargestTimestampFilter(long maxTime, ColumnTracker toTrack) {
+ this.ts = maxTime;
+ this.column = toTrack;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(KeyValue v) {
+ long timestamp = v.getTimestamp();
+ if (timestamp > ts) {
+ this.column.setTs(timestamp);
+ return ReturnCode.SKIP;
+ }
+ return ReturnCode.INCLUDE;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/filter/FamilyOnlyFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/filter/FamilyOnlyFilter.java b/src/main/java/org/apache/hbase/index/covered/filter/FamilyOnlyFilter.java
new file mode 100644
index 0000000..d5796fe
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/filter/FamilyOnlyFilter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered.filter;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+
+/**
+ * Similar to the {@link FamilyFilter} but stops when the end of the family is reached and only
+ * supports equality
+ */
+public class FamilyOnlyFilter extends FamilyFilter {
+
+ boolean done = false;
+ private boolean previousMatchFound;
+
+ /**
+ * Filter on exact binary matches to the passed family
+ * @param family to compare against
+ */
+ public FamilyOnlyFilter(final byte[] family) {
+ this(new BinaryComparator(family));
+ }
+
+ public FamilyOnlyFilter(final WritableByteArrayComparable familyComparator) {
+ super(CompareOp.EQUAL, familyComparator);
+ }
+
+
+ @Override
+ public boolean filterAllRemaining() {
+ return done;
+ }
+
+ @Override
+ public void reset() {
+ done = false;
+ previousMatchFound = false;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(KeyValue v) {
+ if (done) {
+ return ReturnCode.SKIP;
+ }
+ ReturnCode code = super.filterKeyValue(v);
+ if (previousMatchFound) {
+ // we found a match before, and now we are skipping the key because of the family, therefore
+ // we are done (no more of the family).
+ if (code.equals(ReturnCode.SKIP)) {
+ done = true;
+ }
+ } else {
+ // if we haven't seen a match before, then it doesn't matter what we see now, except to mark
+ // if we've seen a match
+ if (code.equals(ReturnCode.INCLUDE)) {
+ previousMatchFound = true;
+ }
+ }
+ return code;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/filter/MaxTimestampFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/filter/MaxTimestampFilter.java b/src/main/java/org/apache/hbase/index/covered/filter/MaxTimestampFilter.java
new file mode 100644
index 0000000..23d17e7
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/filter/MaxTimestampFilter.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Inclusive filter on the maximum timestamp allowed. Excludes all elements greater than (but not
+ * equal to) the given timestamp, so given ts = 5, a {@link KeyValue} with ts 6 is excluded, but not
+ * one with ts = 5.
+ */
+public class MaxTimestampFilter extends FilterBase {
+
+ private long ts;
+
+ public MaxTimestampFilter(long maxTime) {
+ this.ts = maxTime;
+ }
+
+ @Override
+ public KeyValue getNextKeyHint(KeyValue currentKV) {
+ // this might be a little excessive right now - better safe than sorry though, so we don't mess
+ // with other filters too much.
+ KeyValue kv = currentKV.deepCopy();
+ int offset =kv.getTimestampOffset();
+ //set the timestamp in the buffer
+ byte[] buffer = kv.getBuffer();
+ byte[] ts = Bytes.toBytes(this.ts);
+ System.arraycopy(ts, 0, buffer, offset, ts.length);
+
+ return kv;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(KeyValue v) {
+ long timestamp = v.getTimestamp();
+ if (timestamp > ts) {
+ return ReturnCode.SEEK_NEXT_USING_HINT;
+ }
+ return ReturnCode.INCLUDE;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!");
+
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/filter/NewerTimestampFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/filter/NewerTimestampFilter.java b/src/main/java/org/apache/hbase/index/covered/filter/NewerTimestampFilter.java
new file mode 100644
index 0000000..8355f4d
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/filter/NewerTimestampFilter.java
@@ -0,0 +1,37 @@
+package org.apache.hbase.index.covered.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.FilterBase;
+
+/**
+ * Server-side only class used in the indexer to filter out keyvalues newer than a given timestamp
+ * (so allows anything <code><=</code> timestamp through).
+ * <p>
+ * Note,<tt>this</tt> doesn't support {@link #write(DataOutput)} or {@link #readFields(DataInput)}.
+ */
+public class NewerTimestampFilter extends FilterBase {
+
+ private long timestamp;
+
+ public NewerTimestampFilter(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(KeyValue ignored) {
+ return ignored.getTimestamp() > timestamp ? ReturnCode.SKIP : ReturnCode.INCLUDE;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("TimestampFilter is server-side only!");
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("TimestampFilter is server-side only!");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/update/ColumnReference.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/update/ColumnReference.java b/src/main/java/org/apache/hbase/index/covered/update/ColumnReference.java
new file mode 100644
index 0000000..d5d3f96
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/update/ColumnReference.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered.update;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ *
+ */
+public class ColumnReference implements Comparable<ColumnReference> {
+
+ public static final byte[] ALL_QUALIFIERS = new byte[0];
+
+ private static int calcHashCode(byte[] family, byte[] qualifier) {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Bytes.hashCode(family);
+ result = prime * result + Bytes.hashCode(qualifier);
+ return result;
+ }
+
+ private final int hashCode;
+ protected final byte[] family;
+ protected final byte[] qualifier;
+
+ public ColumnReference(byte[] family, byte[] qualifier) {
+ this.family = family;
+ this.qualifier = qualifier;
+ this.hashCode = calcHashCode(family, qualifier);
+ }
+
+ public byte[] getFamily() {
+ return this.family;
+ }
+
+ public byte[] getQualifier() {
+ return this.qualifier;
+ }
+
+ public boolean matches(KeyValue kv) {
+ if (matchesFamily(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength())) {
+ return matchesQualifier(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
+ }
+ return false;
+ }
+
+ /**
+ * @param qual to check against
+ * @return <tt>true</tt> if this column covers the given qualifier.
+ */
+ public boolean matchesQualifier(byte[] qual) {
+ return matchesQualifier(qual, 0, qual.length);
+ }
+
+ public boolean matchesQualifier(byte[] bytes, int offset, int length) {
+ return allColumns() ? true : match(bytes, offset, length, qualifier);
+ }
+
+ /**
+ * @param family to check against
+ * @return <tt>true</tt> if this column covers the given family.
+ */
+ public boolean matchesFamily(byte[] family) {
+ return matchesFamily(family, 0, family.length);
+ }
+
+ public boolean matchesFamily(byte[] bytes, int offset, int length) {
+ return match(bytes, offset, length, family);
+ }
+
+ /**
+ * @return <tt>true</tt> if this should include all column qualifiers, <tt>false</tt> otherwise
+ */
+ public boolean allColumns() {
+ return this.qualifier == ALL_QUALIFIERS;
+ }
+
+ /**
+ * Check to see if the passed bytes match the stored bytes
+ * @param first
+ * @param storedKey the stored byte[], should never be <tt>null</tt>
+ * @return <tt>true</tt> if they are byte-equal
+ */
+ private boolean match(byte[] first, int offset, int length, byte[] storedKey) {
+ return first == null ? false : Bytes.equals(first, offset, length, storedKey, 0,
+ storedKey.length);
+ }
+
+ public KeyValue getFirstKeyValueForRow(byte[] row) {
+ return KeyValue.createFirstOnRow(row, family, qualifier == ALL_QUALIFIERS ? null : qualifier);
+ }
+
+ @Override
+ public int compareTo(ColumnReference o) {
+ int c = Bytes.compareTo(family, o.family);
+ if (c == 0) {
+ // matching families, compare qualifiers
+ c = Bytes.compareTo(qualifier, o.qualifier);
+ }
+ return c;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ColumnReference) {
+ ColumnReference other = (ColumnReference) o;
+ if (hashCode == other.hashCode && Bytes.equals(family, other.family)) {
+ return Bytes.equals(qualifier, other.qualifier);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnReference - " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/update/ColumnTracker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/update/ColumnTracker.java b/src/main/java/org/apache/hbase/index/covered/update/ColumnTracker.java
new file mode 100644
index 0000000..1fe40a7
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/update/ColumnTracker.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered.update;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Simple POJO for tracking a bunch of column references and the next-newest timestamp for those
+ * columns
+ * <p>
+ * Two {@link ColumnTracker}s are considered equal if they track the same columns, even if their
+ * timestamps are different.
+ */
+public class ColumnTracker implements IndexedColumnGroup {
+
+ public static final long NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP = Long.MAX_VALUE;
+ public static final long GUARANTEED_NEWER_UPDATES = Long.MIN_VALUE;
+ private final List<ColumnReference> columns;
+ private long ts = NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+ private final int hashCode;
+
+ private static int calcHashCode(List<ColumnReference> columns) {
+ return columns.hashCode();
+ }
+
+ public ColumnTracker(Collection<? extends ColumnReference> columns) {
+ this.columns = new ArrayList<ColumnReference>(columns);
+ // sort the columns
+ Collections.sort(this.columns);
+ this.hashCode = calcHashCode(this.columns);
+ }
+
+ /**
+ * Set the current timestamp, only if the passed timestamp is strictly less than the currently
+ * stored timestamp
+ * @param ts the timestmap to potentially store.
+ * @return the currently stored timestamp.
+ */
+ public long setTs(long ts) {
+ this.ts = this.ts > ts ? ts : this.ts;
+ return this.ts;
+ }
+
+ public long getTS() {
+ return this.ts;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object o){
+ if(!(o instanceof ColumnTracker)){
+ return false;
+ }
+ ColumnTracker other = (ColumnTracker)o;
+ if (hashCode != other.hashCode) {
+ return false;
+ }
+ if (other.columns.size() != columns.size()) {
+ return false;
+ }
+
+ // check each column to see if they match
+ for (int i = 0; i < columns.size(); i++) {
+ if (!columns.get(i).equals(other.columns.get(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public List<ColumnReference> getColumns() {
+ return this.columns;
+ }
+
+ /**
+ * @return <tt>true</tt> if this set of columns has seen a column with a timestamp newer than the
+ * requested timestamp, <tt>false</tt> otherwise.
+ */
+ public boolean hasNewerTimestamps() {
+ return !isNewestTime(this.ts);
+ }
+
+ /**
+ * @param ts timestamp to check
+ * @return <tt>true</tt> if the timestamp is at the most recent timestamp for a column
+ */
+ public static boolean isNewestTime(long ts) {
+ return ts == NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/update/IndexUpdateManager.java b/src/main/java/org/apache/hbase/index/covered/update/IndexUpdateManager.java
new file mode 100644
index 0000000..2eacfb8
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/update/IndexUpdateManager.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered.update;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Keeps track of the index updates
+ */
+public class IndexUpdateManager {
+
+ public Comparator<Mutation> COMPARATOR = new MutationComparator();
+ class MutationComparator implements Comparator<Mutation> {
+
+ @Override
+ public int compare(Mutation o1, Mutation o2) {
+ // always sort rows first
+ int compare = o1.compareTo(o2);
+ if (compare != 0) {
+ return compare;
+ }
+
+ // if same row, sort by reverse timestamp (larger first)
+ compare = Longs.compare(o2.getTimeStamp(), o1.getTimeStamp());
+ if (compare != 0) {
+ return compare;
+ }
+ // deletes always sort before puts for the same row
+ if (o1 instanceof Delete) {
+ // same row, same ts == same delete since we only delete rows
+ if (o2 instanceof Delete) {
+ return 0;
+ } else {
+ // o2 has to be a put
+ return -1;
+ }
+ }
+ // o1 must be a put
+ if (o2 instanceof Delete) {
+ return 1;
+ } else if (o2 instanceof Put) {
+ return comparePuts((Put) o1, (Put) o2);
+ }
+
+ throw new RuntimeException(
+ "Got unexpected mutation types! Can only be Put or Delete, but got: " + o1 + ", and "
+ + o2);
+ }
+
+ private int comparePuts(Put p1, Put p2) {
+ int p1Size = p1.size();
+ int p2Size = p2.size();
+ int compare = p1Size - p2Size;
+ if (compare == 0) {
+ // TODO: make this a real comparison
+ // this is a little cheating, but we don't really need to worry too much about this being
+ // the same - chances are that exact matches here are really the same update.
+ return Longs.compare(p1.heapSize(), p2.heapSize());
+ }
+ return compare;
+ }
+
+ }
+
+ private static final String PHOENIX_HBASE_TEMP_DELETE_MARKER = "phoenix.hbase.temp.delete.marker";
+ private static final byte[] TRUE_MARKER = new byte[] { 1 };
+
+ protected final Map<ImmutableBytesPtr, Collection<Mutation>> map =
+ new HashMap<ImmutableBytesPtr, Collection<Mutation>>();
+
+ /**
+ * Add an index update. Keeps the latest {@link Put} for a given timestamp
+ * @param tableName
+ * @param m
+ */
+ public void addIndexUpdate(byte[] tableName, Mutation m) {
+ // we only keep the most recent update
+ ImmutableBytesPtr key = new ImmutableBytesPtr(tableName);
+ Collection<Mutation> updates = map.get(key);
+ if (updates == null) {
+ updates = new SortedCollection<Mutation>(COMPARATOR);
+ map.put(key, updates);
+ }
+ fixUpCurrentUpdates(updates, m);
+ }
+
+ /**
+ * Fix up the current updates, given the pending mutation.
+ * @param updates current updates
+ * @param pendingMutation
+ */
+ protected void fixUpCurrentUpdates(Collection<Mutation> updates, Mutation pendingMutation) {
+ // need to check for each entry to see if we have a duplicate
+ Mutation toRemove = null;
+ Delete pendingDelete = pendingMutation instanceof Delete ? (Delete) pendingMutation : null;
+ boolean sawRowMatch = false;
+ for (Mutation stored : updates) {
+ int compare = pendingMutation.compareTo(stored);
+ // skip to the right row
+ if (compare < 0) {
+ continue;
+ } else if (compare > 0) {
+ if (sawRowMatch) {
+ break;
+ }
+ continue;
+ }
+
+ // set that we saw a row match, so any greater row will necessarily be the wrong
+ sawRowMatch = true;
+
+ // skip until we hit the right timestamp
+ if (stored.getTimeStamp() < pendingMutation.getTimeStamp()) {
+ continue;
+ }
+
+ if (stored instanceof Delete) {
+ // we already have a delete for this row, so we are done.
+ if (pendingDelete != null) {
+ return;
+ }
+ // pending update must be a Put, so we ignore the Put.
+ // add a marker in the this delete that it has been canceled out already. We need to keep
+ // the delete around though so we can figure out if other Puts would also be canceled out.
+ markMutationForRemoval(stored);
+ return;
+ }
+
+ // otherwise, the stored mutation is a Put. Either way, we want to remove it. If the pending
+ // update is a delete, we need to remove the entry (no longer applies - covered by the
+ // delete), or its an older version of the row, so we cover it with the newer.
+ toRemove = stored;
+ if (pendingDelete != null) {
+ // the pending mutation, but we need to mark the mutation for removal later
+ markMutationForRemoval(pendingMutation);
+ break;
+ }
+ }
+
+ updates.remove(toRemove);
+ updates.add(pendingMutation);
+ }
+
+ private void markMutationForRemoval(Mutation m) {
+ m.setAttribute(PHOENIX_HBASE_TEMP_DELETE_MARKER, TRUE_MARKER);
+ }
+
+ public List<Pair<Mutation, byte[]>> toMap() {
+ List<Pair<Mutation, byte[]>> updateMap = Lists.newArrayList();
+ for (Entry<ImmutableBytesPtr, Collection<Mutation>> updates : map.entrySet()) {
+ // get is ok because we always set with just the bytes
+ byte[] tableName = updates.getKey().get();
+ // TODO replace this as just storing a byte[], to avoid all the String <-> byte[] swapping
+ // HBase does
+ for (Mutation m : updates.getValue()) {
+ // skip elements that have been marked for delete
+ if (shouldBeRemoved(m)) {
+ continue;
+ }
+ updateMap.add(new Pair<Mutation, byte[]>(m, tableName));
+ }
+ }
+ return updateMap;
+ }
+
+ /**
+ * @param updates
+ */
+ public void addAll(Collection<Pair<Mutation, String>> updates) {
+ for (Pair<Mutation, String> update : updates) {
+ addIndexUpdate(Bytes.toBytes(update.getSecond()), update.getFirst());
+ }
+ }
+
+ private boolean shouldBeRemoved(Mutation m) {
+ return m.getAttribute(PHOENIX_HBASE_TEMP_DELETE_MARKER) != null;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer("Pending Index Updates:\n");
+ for (Entry<ImmutableBytesPtr, Collection<Mutation>> entry : map.entrySet()) {
+ String tableName = Bytes.toString(entry.getKey().get());
+ sb.append(" Table: '" + tableName + "'\n");
+ for (Mutation m : entry.getValue()) {
+ sb.append("\t");
+ if (shouldBeRemoved(m)) {
+ sb.append("[REMOVED]");
+ }
+ sb.append(m.getClass().getSimpleName() + ":"
+ + ((m instanceof Put) ? m.getTimeStamp() + " " : ""));
+ sb.append(" row=" + Bytes.toString(m.getRow()));
+ sb.append("\n");
+ if (m.getFamilyMap().isEmpty()) {
+ sb.append("\t\t=== EMPTY ===\n");
+ }
+ for (List<KeyValue> kvs : m.getFamilyMap().values()) {
+ for (KeyValue kv : kvs) {
+ sb.append("\t\t" + kv.toString() + "/value=" + Bytes.toStringBinary(kv.getValue()));
+ sb.append("\n");
+ }
+ }
+ }
+ }
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/update/IndexedColumnGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/update/IndexedColumnGroup.java b/src/main/java/org/apache/hbase/index/covered/update/IndexedColumnGroup.java
new file mode 100644
index 0000000..771286b
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/update/IndexedColumnGroup.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered.update;
+
+import java.util.List;
+
+/**
+ * Group of columns that were requested to build an index
+ */
+public interface IndexedColumnGroup {
+
+ public List<ColumnReference> getColumns();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/update/SortedCollection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/update/SortedCollection.java b/src/main/java/org/apache/hbase/index/covered/update/SortedCollection.java
new file mode 100644
index 0000000..2aed74a
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/update/SortedCollection.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered.update;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * A collection whose elements are stored and returned sorted.
+ * <p>
+ * We can't just use something like a {@link PriorityQueue} because it doesn't return the
+ * underlying values in sorted order.
+ * @param <T>
+ */
+class SortedCollection<T> implements Collection<T>, Iterable<T> {
+
+ private PriorityQueue<T> queue;
+ private Comparator<T> comparator;
+
+ /**
+ * Use the given comparator to compare all keys for sorting
+ * @param comparator
+ */
+ public SortedCollection(Comparator<T> comparator) {
+ this.queue = new PriorityQueue<T>(1, comparator);
+ this.comparator = comparator;
+ }
+
+ /**
+ * All passed elements are expected to be {@link Comparable}
+ */
+ public SortedCollection() {
+ this.queue = new PriorityQueue<T>();
+ }
+
+ @Override
+ public int size() {
+ return this.queue.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return this.queue.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return this.queue.contains(o);
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ @SuppressWarnings("unchecked")
+ T[] array = (T[]) this.queue.toArray();
+ if (this.comparator == null) {
+ Arrays.sort(array);
+ } else {
+ Arrays.sort(
+ array, this.comparator);}
+ return Iterators.forArray(array);
+ }
+
+ @Override
+ public Object[] toArray() {
+ return this.queue.toArray();
+ }
+
+ @SuppressWarnings("hiding")
+ @Override
+ public <T> T[] toArray(T[] a) {
+ return this.queue.toArray(a);
+ }
+
+ @Override
+ public boolean add(T e) {
+ return this.queue.add(e);
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return this.queue.remove(o);
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ return this.queue.containsAll(c);
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends T> c) {
+ return this.queue.addAll(c);
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ return queue.removeAll(c);
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ return this.queue.retainAll(c);
+ }
+
+ @Override
+ public void clear() {
+ this.queue.clear();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/exception/IndexWriteException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/exception/IndexWriteException.java b/src/main/java/org/apache/hbase/index/exception/IndexWriteException.java
new file mode 100644
index 0000000..2fdabf6
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/exception/IndexWriteException.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.exception;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+
+/**
+ * Generic {@link Exception} that an index write has failed
+ */
+@SuppressWarnings("serial")
+public class IndexWriteException extends HBaseIOException {
+
+ public IndexWriteException() {
+ super();
+ }
+
+ public IndexWriteException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public IndexWriteException(String message) {
+ super(message);
+ }
+
+ public IndexWriteException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/exception/MultiIndexWriteFailureException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/exception/MultiIndexWriteFailureException.java b/src/main/java/org/apache/hbase/index/exception/MultiIndexWriteFailureException.java
new file mode 100644
index 0000000..4b34968
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/exception/MultiIndexWriteFailureException.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.exception;
+
+import java.util.List;
+
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Indicate a failure to write to multiple index tables.
+ */
+@SuppressWarnings("serial")
+public class MultiIndexWriteFailureException extends IndexWriteException {
+
+ private List<HTableInterfaceReference> failures;
+
+ /**
+ * @param failures the tables to which the index write did not succeed
+ */
+ public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures) {
+ super("Failed to write to multiple index tables");
+ this.failures = failures;
+
+ }
+
+ public List<HTableInterfaceReference> getFailedTables() {
+ return this.failures;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/exception/SingleIndexWriteFailureException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/exception/SingleIndexWriteFailureException.java b/src/main/java/org/apache/hbase/index/exception/SingleIndexWriteFailureException.java
new file mode 100644
index 0000000..c2a0baa
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/exception/SingleIndexWriteFailureException.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.exception;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Mutation;
+
+/**
+ * Exception thrown if we cannot successfully write to an index table.
+ */
+@SuppressWarnings("serial")
+public class SingleIndexWriteFailureException extends IndexWriteException {
+
+ private String table;
+
+ /**
+ * Cannot reach the index, but not sure of the table or the mutations that caused the failure
+ * @param msg more description of what happened
+ * @param cause original cause
+ */
+ public SingleIndexWriteFailureException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ /**
+ * Failed to write the passed mutations to an index table for some reason.
+ * @param targetTableName index table to which we attempted to write
+ * @param mutations mutations that were attempted
+ * @param cause underlying reason for the failure
+ */
+ public SingleIndexWriteFailureException(String targetTableName, List<Mutation> mutations,
+ Exception cause) {
+ super("Failed to make index update:\n\t table: " + targetTableName + "\n\t edits: " + mutations
+ + "\n\tcause: " + cause == null ? "UNKNOWN" : cause.getMessage(), cause);
+ this.table = targetTableName;
+ }
+
+ /**
+ * @return The table to which we failed to write the index updates. If unknown, returns
+ * <tt>null</tt>
+ */
+ public String getTableName() {
+ return this.table;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/parallel/BaseTaskRunner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/parallel/BaseTaskRunner.java b/src/main/java/org/apache/hbase/index/parallel/BaseTaskRunner.java
new file mode 100644
index 0000000..3cae70c
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/parallel/BaseTaskRunner.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.parallel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * {@link TaskRunner} that just manages the underlying thread pool. On called to
+ * {@link #stop(String)}, the thread pool is shutdown immediately - all pending tasks are cancelled
+ * and running tasks receive and interrupt.
+ * <p>
+ * If we find a failure the failure is propagated to the {@link TaskBatch} so any {@link Task} that
+ * is interested can kill itself as well.
+ */
+public abstract class BaseTaskRunner implements TaskRunner {
+
+ private static final Log LOG = LogFactory.getLog(BaseTaskRunner.class);
+ protected ListeningExecutorService writerPool;
+ private boolean stopped;
+
+ public BaseTaskRunner(ExecutorService service) {
+ this.writerPool = MoreExecutors.listeningDecorator(service);
+ }
+
+ @Override
+ public <R> List<R> submit(TaskBatch<R> tasks) throws CancellationException, ExecutionException,
+ InterruptedException {
+ // submit each task to the pool and queue it up to be watched
+ List<ListenableFuture<R>> futures = new ArrayList<ListenableFuture<R>>(tasks.size());
+ for (Task<R> task : tasks.getTasks()) {
+ futures.add(this.writerPool.submit(task));
+ }
+ try {
+ // This logic is actually much more synchronized than the previous logic. Now we rely on a
+ // synchronization around the status to tell us when we are done. While this does have the
+ // advantage of being (1) less code, and (2) supported as part of a library, it is just that
+ // little bit slower. If push comes to shove, we can revert back to the previous
+ // implementation, but for right now, this works just fine.
+ return submitTasks(futures).get();
+ } catch (CancellationException e) {
+ // propagate the failure back out
+ logAndNotifyAbort(e, tasks);
+ throw e;
+ } catch (ExecutionException e) {
+ // propagate the failure back out
+ logAndNotifyAbort(e, tasks);
+ throw e;
+ }
+ }
+
+ private void logAndNotifyAbort(Exception e, Abortable abort) {
+ String msg = "Found a failed task because: " + e.getMessage();
+ LOG.error(msg, e);
+ abort.abort(msg, e.getCause());
+ }
+
+ /**
+ * Build a ListenableFuture for the tasks. Implementing classes can determine return behaviors on
+ * the given tasks
+ * @param futures to wait on
+ * @return a single {@link ListenableFuture} that completes based on the passes tasks.
+ */
+ protected abstract <R> ListenableFuture<List<R>> submitTasks(List<ListenableFuture<R>> futures);
+
+ @Override
+ public <R> List<R> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
+ ExecutionException {
+ boolean interrupted = false;
+ try {
+ while (!this.isStopped()) {
+ try {
+ return this.submit(tasks);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ // restore the interrupted status
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ // should only get here if we are interrupted while waiting for a result and have been told to
+ // shutdown by an external source
+ throw new EarlyExitFailure("Interrupted and stopped before computation was complete!");
+ }
+
+ @Override
+ public void stop(String why) {
+ if (this.stopped) {
+ return;
+ }
+ LOG.info("Shutting down task runner because " + why);
+ this.writerPool.shutdownNow();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/parallel/EarlyExitFailure.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/parallel/EarlyExitFailure.java b/src/main/java/org/apache/hbase/index/parallel/EarlyExitFailure.java
new file mode 100644
index 0000000..f2f0dca
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/parallel/EarlyExitFailure.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.parallel;
+
+import java.io.IOException;
+
+/**
+ * Exception denoting a need to early-exit a task (or group of tasks) due to external notification
+ */
+@SuppressWarnings("serial")
+public class EarlyExitFailure extends IOException {
+
+ /**
+ * @param msg reason for the early exit
+ */
+ public EarlyExitFailure(String msg) {
+ super(msg);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/parallel/QuickFailingTaskRunner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/parallel/QuickFailingTaskRunner.java b/src/main/java/org/apache/hbase/index/parallel/QuickFailingTaskRunner.java
new file mode 100644
index 0000000..5bc2992
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/parallel/QuickFailingTaskRunner.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.parallel;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * {@link TaskRunner} that attempts to run all tasks passed, but quits early if any {@link Task}
+ * fails, not waiting for the remaining {@link Task}s to complete.
+ */
+public class QuickFailingTaskRunner extends BaseTaskRunner {
+
+ static final Log LOG = LogFactory.getLog(QuickFailingTaskRunner.class);
+
+ /**
+ * @param service thread pool to which {@link Task}s are submitted. This service is then 'owned'
+ * by <tt>this</tt> and will be shutdown on calls to {@link #stop(String)}.
+ */
+ public QuickFailingTaskRunner(ExecutorService service) {
+ super(service);
+ }
+
+ @Override
+ protected <R> ListenableFuture<List<R>> submitTasks(List<ListenableFuture<R>> futures) {
+ return Futures.allAsList(futures);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/parallel/Task.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/parallel/Task.java b/src/main/java/org/apache/hbase/index/parallel/Task.java
new file mode 100644
index 0000000..5cb5142
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/parallel/Task.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.parallel;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hbase.Abortable;
+
+/**
+ * Like a {@link Callable}, but supports an internal {@link Abortable} that can be checked
+ * periodically to determine if the batch should abort
+ * @param <V> expected result of the task
+ */
+public abstract class Task<V> implements Callable<V> {
+
+ private Abortable batch;
+
+ void setBatchMonitor(Abortable abort) {
+ this.batch = abort;
+ }
+
+ protected boolean isBatchFailed() {
+ return this.batch.isAborted();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/parallel/TaskBatch.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/parallel/TaskBatch.java b/src/main/java/org/apache/hbase/index/parallel/TaskBatch.java
new file mode 100644
index 0000000..d6b7f7f
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/parallel/TaskBatch.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.parallel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+
+/**
+ * A group of {@link Task}s. The tasks are all bound together using the same {@link Abortable} (
+ * <tt>this</tt>) to ensure that all tasks are aware when any of the other tasks fails.
+ * @param <V> expected result type from all the tasks
+ */
+public class TaskBatch<V> implements Abortable {
+ private static final Log LOG = LogFactory.getLog(TaskBatch.class);
+ private AtomicBoolean aborted = new AtomicBoolean();
+ private List<Task<V>> tasks;
+
+ /**
+ * @param size expected number of tasks
+ */
+ public TaskBatch(int size) {
+ this.tasks = new ArrayList<Task<V>>(size);
+ }
+
+ public void add(Task<V> task) {
+ this.tasks.add(task);
+ task.setBatchMonitor(this);
+ }
+
+ public Collection<Task<V>> getTasks() {
+ return this.tasks;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ if (this.aborted.getAndSet(true)) {
+ return;
+ }
+ LOG.info("Aborting batch of tasks because " + why);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted.get();
+ }
+
+ /**
+ * @return the number of tasks assigned to this batch
+ */
+ public int size() {
+ return this.tasks.size();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/parallel/TaskRunner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/parallel/TaskRunner.java b/src/main/java/org/apache/hbase/index/parallel/TaskRunner.java
new file mode 100644
index 0000000..9863a08
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/parallel/TaskRunner.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.parallel;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.hbase.Stoppable;
+
+/**
+ *
+ */
+public interface TaskRunner extends Stoppable {
+
+ /**
+ * Submit the given tasks to the pool and wait for them to complete. fail.
+ * <p>
+ * Non-interruptible method. To stop any running tasks call {@link #stop(String)} - this will
+ * shutdown the thread pool, causing any pending tasks to be failed early (whose failure will be
+ * ignored) and interrupt any running tasks. It is up to the passed tasks to respect the interrupt
+ * notification
+ * @param tasks to run
+ * @return the result from each task
+ * @throws ExecutionException if any of the tasks fails. Wraps the underyling failure, which can
+ * be retrieved via {@link ExecutionException#getCause()}.
+ * @throws InterruptedException if the current thread is interrupted while waiting for the batch
+ * to complete
+ */
+ public <R> List<R> submit(TaskBatch<R> tasks) throws
+ ExecutionException, InterruptedException;
+
+ /**
+ * Similar to {@link #submit(TaskBatch)}, but is not interruptible. If an interrupt is found while
+ * waiting for results, we ignore it and only stop is {@link #stop(String)} has been called. On
+ * return from the method, the interrupt status of the thread is restored.
+ * @param tasks to run
+ * @return the result from each task
+ * @throws EarlyExitFailure if there are still tasks to submit to the pool, but there is a stop
+ * notification
+ * @throws ExecutionException if any of the tasks fails. Wraps the underyling failure, which can
+ * be retrieved via {@link ExecutionException#getCause()}.
+ */
+ public <R> List<R> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
+ ExecutionException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/parallel/ThreadPoolBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/parallel/ThreadPoolBuilder.java b/src/main/java/org/apache/hbase/index/parallel/ThreadPoolBuilder.java
new file mode 100644
index 0000000..506f7da
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/parallel/ThreadPoolBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.parallel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Helper utility to make a thread pool from a configuration based on reasonable defaults and passed
+ * configuration keys.
+ */
+public class ThreadPoolBuilder {
+
+ private static final Log LOG = LogFactory.getLog(ThreadPoolBuilder.class);
+ private static final long DEFAULT_TIMEOUT = 60;
+ private static final int DEFAULT_MAX_THREADS = 1;// is there a better default?
+ private Pair<String, Long> timeout;
+ private Pair<String, Integer> maxThreads;
+ private String name;
+ private Configuration conf;
+
+ public ThreadPoolBuilder(String poolName, Configuration conf) {
+ this.name = poolName;
+ this.conf = conf;
+ }
+
+ public ThreadPoolBuilder setCoreTimeout(String confkey, long defaultTime) {
+ if (defaultTime <= 0) {
+ defaultTime = DEFAULT_TIMEOUT;
+ }
+ this.timeout = new Pair<String, Long>(confkey, defaultTime);
+ return this;
+ }
+
+ public ThreadPoolBuilder setCoreTimeout(String confKey) {
+ return this.setCoreTimeout(confKey, DEFAULT_TIMEOUT);
+ }
+
+ public ThreadPoolBuilder setMaxThread(String confkey, int defaultThreads) {
+ if (defaultThreads <= 0) {
+ defaultThreads = DEFAULT_MAX_THREADS;
+ }
+ this.maxThreads = new Pair<String, Integer>(confkey, defaultThreads);
+ return this;
+ }
+
+ String getName() {
+ return this.name;
+ }
+
+ int getMaxThreads() {
+ int maxThreads = DEFAULT_MAX_THREADS;
+ if (this.maxThreads != null) {
+ String key = this.maxThreads.getFirst();
+ maxThreads =
+ key == null ? this.maxThreads.getSecond() : conf.getInt(key, this.maxThreads.getSecond());
+ }
+ LOG.trace("Creating pool builder with max " + maxThreads + " threads ");
+ return maxThreads;
+ }
+
+ long getKeepAliveTime() {
+ long timeout =DEFAULT_TIMEOUT;
+ if (this.timeout != null) {
+ String key = this.timeout.getFirst();
+ timeout =
+ key == null ? this.timeout.getSecond() : conf.getLong(key, this.timeout.getSecond());
+ }
+
+ LOG.trace("Creating pool builder with core thread timeout of " + timeout + " seconds ");
+ return timeout;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/parallel/ThreadPoolManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/parallel/ThreadPoolManager.java b/src/main/java/org/apache/hbase/index/parallel/ThreadPoolManager.java
new file mode 100644
index 0000000..ea9ef89
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/parallel/ThreadPoolManager.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.parallel;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * Manage access to thread pools
+ */
+public class ThreadPoolManager {
+
+ private static final Log LOG = LogFactory.getLog(ThreadPoolManager.class);
+
+ /**
+ * Get an executor for the given name, based on the passed {@link Configuration}. If a thread pool
+ * already exists with that name, it will be returned.
+ * @param builder
+ * @param env
+ * @return a {@link ThreadPoolExecutor} for the given name. Thread pool that only shuts down when
+ * there are no more explicit references to it. You do not need to shutdown the threadpool
+ * on your own - it is managed for you. When you are done, you merely need to release your
+ * reference. If you do attempt to shutdown the pool, you should be careful to call
+ * {@link ThreadPoolExecutor#shutdown()} XOR {@link ThreadPoolExecutor#shutdownNow()} - extra calls to either can lead to
+ * early shutdown of the pool.
+ */
+ public static synchronized ThreadPoolExecutor getExecutor(ThreadPoolBuilder builder,
+ RegionCoprocessorEnvironment env) {
+ return getExecutor(builder, env.getSharedData());
+ }
+
+ static synchronized ThreadPoolExecutor getExecutor(ThreadPoolBuilder builder,
+ Map<String, Object> poolCache) {
+ ThreadPoolExecutor pool = (ThreadPoolExecutor) poolCache.get(builder.getName());
+ if (pool == null || pool.isTerminating() || pool.isShutdown()) {
+ pool = getDefaultExecutor(builder);
+ LOG.info("Creating new pool for " + builder.getName());
+ poolCache.put(builder.getName(), pool);
+ }
+ ((ShutdownOnUnusedThreadPoolExecutor) pool).addReference();
+
+ return pool;
+ }
+
+ /**
+ * @param conf
+ * @return
+ */
+ private static ShutdownOnUnusedThreadPoolExecutor getDefaultExecutor(ThreadPoolBuilder builder) {
+ int maxThreads = builder.getMaxThreads();
+ long keepAliveTime = builder.getKeepAliveTime();
+
+ // we prefer starting a new thread to queuing (the opposite of the usual ThreadPoolExecutor)
+ // since we are probably writing to a bunch of index tables in this case. Any pending requests
+ // are then queued up in an infinite (Integer.MAX_VALUE) queue. However, we allow core threads
+ // to timeout, to we tune up/down for bursty situations. We could be a bit smarter and more
+ // closely manage the core-thread pool size to handle the bursty traffic (so we can always keep
+ // some core threads on hand, rather than starting from scratch each time), but that would take
+ // even more time. If we shutdown the pool, but are still putting new tasks, we can just do the
+ // usual policy and throw a RejectedExecutionException because we are shutting down anyways and
+ // the worst thing is that this gets unloaded.
+ ShutdownOnUnusedThreadPoolExecutor pool =
+ new ShutdownOnUnusedThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+ Threads.newDaemonThreadFactory(builder.getName() + "-"), builder.getName());
+ pool.allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ /**
+ * Thread pool that only shuts down when there are no more explicit references to it. A reference
+ * is when obtained and released on calls to {@link #shutdown()} or {@link #shutdownNow()}.
+ * Therefore, users should be careful to call {@link #shutdown()} XOR {@link #shutdownNow()} -
+ * extra calls to either can lead to early shutdown of the pool.
+ */
+ private static class ShutdownOnUnusedThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private AtomicInteger references;
+ private String poolName;
+
+ public ShutdownOnUnusedThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime,
+ TimeUnit timeUnit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
+ String poolName) {
+ super(coreThreads, maxThreads, keepAliveTime, timeUnit, workQueue, threadFactory);
+ this.references = new AtomicInteger();
+ this.poolName = poolName;
+ }
+
+ public void addReference() {
+ this.references.incrementAndGet();
+ }
+
+ @Override
+ protected void finalize() {
+ // override references counter if we go out of scope - ensures the pool gets cleaned up
+ LOG.info("Shutting down pool '" + poolName + "' because no more references");
+ super.finalize();
+ }
+
+ @Override
+ public void shutdown() {
+ if (references.decrementAndGet() <= 0) {
+ LOG.debug("Shutting down pool " + this.poolName);
+ super.shutdown();
+ }
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ if (references.decrementAndGet() <= 0) {
+ LOG.debug("Shutting down pool " + this.poolName + " NOW!");
+ return super.shutdownNow();
+ }
+ return Collections.emptyList();
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/parallel/WaitForCompletionTaskRunner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/parallel/WaitForCompletionTaskRunner.java b/src/main/java/org/apache/hbase/index/parallel/WaitForCompletionTaskRunner.java
new file mode 100644
index 0000000..8dea32b
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/parallel/WaitForCompletionTaskRunner.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.parallel;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * A {@link TaskRunner} that ensures that all the tasks have been attempted before we return, even
+ * if some of the tasks cause failures.
+ * <p>
+ * Because we wait until the entire batch is complete to see the failure, checking for failure of
+ * the {@link TaskBatch} on the submitted tasks will not help - they will never see the failure of
+ * the other tasks. You will need to provide an external mechanism to propagate the error.
+ * <p>
+ * Does not throw an {@link ExecutionException} if any of the tasks fail.
+ */
+public class WaitForCompletionTaskRunner extends BaseTaskRunner {
+
+ /**
+ * @param service thread pool to which {@link Task}s are submitted. This service is then 'owned'
+ * by <tt>this</tt> and will be shutdown on calls to {@link #stop(String)}.
+ */
+ public WaitForCompletionTaskRunner(ExecutorService service) {
+ super(service);
+ }
+
+ @Override
+ public <R> ListenableFuture<List<R>> submitTasks(List<ListenableFuture<R>> futures) {
+ return Futures.successfulAsList(futures);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/scanner/EmptyScanner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/scanner/EmptyScanner.java b/src/main/java/org/apache/hbase/index/scanner/EmptyScanner.java
new file mode 100644
index 0000000..140671e
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/scanner/EmptyScanner.java
@@ -0,0 +1,32 @@
+package org.apache.hbase.index.scanner;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+
+/**
+ * {@link Scanner} that has no underlying data
+ */
+public class EmptyScanner implements Scanner {
+
+ @Override
+ public KeyValue next() throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean seek(KeyValue next) throws IOException {
+ return false;
+ }
+
+ @Override
+ public KeyValue peek() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/scanner/FilteredKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/scanner/FilteredKeyValueScanner.java b/src/main/java/org/apache/hbase/index/scanner/FilteredKeyValueScanner.java
new file mode 100644
index 0000000..0a13ba1
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/scanner/FilteredKeyValueScanner.java
@@ -0,0 +1,134 @@
+package org.apache.hbase.index.scanner;
+
+import java.io.IOException;
+import java.util.SortedSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+
+import org.apache.hbase.index.covered.KeyValueStore;
+
+/**
+ * Combine a simplified version of the logic in the ScanQueryMatcher and the KeyValueScanner. We can
+ * get away with this here because we are only concerned with a single MemStore for the index; we
+ * don't need to worry about multiple column families or minimizing seeking through file - we just
+ * want to iterate the kvs quickly, in-memory.
+ */
+public class FilteredKeyValueScanner implements KeyValueScanner {
+
+ private KeyValueScanner delegate;
+ private Filter filter;
+
+ public FilteredKeyValueScanner(Filter filter, KeyValueStore store) {
+ this(filter, store.getScanner());
+ }
+
+ private FilteredKeyValueScanner(Filter filter, KeyValueScanner delegate) {
+ this.delegate = delegate;
+ this.filter = filter;
+ }
+
+ @Override
+ public KeyValue peek() {
+ return delegate.peek();
+ }
+
+ /**
+ * Same a {@link KeyValueScanner#next()} except that we filter out the next {@link KeyValue} until
+ * we find one that passes the filter.
+ * @return the next {@link KeyValue} or <tt>null</tt> if no next {@link KeyValue} is present and
+ * passes all the filters.
+ */
+ @Override
+ public KeyValue next() throws IOException {
+ seekToNextUnfilteredKeyValue();
+ return delegate.next();
+ }
+
+ @Override
+ public boolean seek(KeyValue key) throws IOException {
+ if(filter.filterAllRemaining()){
+ return false;
+ }
+ // see if we can seek to the next key
+ if (!delegate.seek(key)) {
+ return false;
+ }
+
+ return seekToNextUnfilteredKeyValue();
+ }
+
+ private boolean seekToNextUnfilteredKeyValue() throws IOException {
+ while (true) {
+ KeyValue peeked = delegate.peek();
+ // no more key values, so we are done
+ if (peeked == null) {
+ return false;
+ }
+
+ // filter the peeked value to see if it should be served
+ ReturnCode code = filter.filterKeyValue(peeked);
+ switch (code) {
+ // included, so we are done
+ case INCLUDE:
+ case INCLUDE_AND_NEXT_COL:
+ return true;
+ // not included, so we need to go to the next row
+ case SKIP:
+ case NEXT_COL:
+ case NEXT_ROW:
+ delegate.next();
+ break;
+ // use a seek hint to find out where we should go
+ case SEEK_NEXT_USING_HINT:
+ delegate.seek(filter.getNextKeyHint(peeked));
+ }
+ }
+ }
+
+ @Override
+ public boolean reseek(KeyValue key) throws IOException {
+ this.delegate.reseek(key);
+ return this.seekToNextUnfilteredKeyValue();
+ }
+
+ @Override
+ public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom) throws IOException {
+ return this.reseek(kv);
+ }
+
+ @Override
+ public boolean isFileScanner() {
+ return false;
+ }
+
+ @Override
+ public long getSequenceID() {
+ return this.delegate.getSequenceID();
+ }
+
+ @Override
+ public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
+ throw new UnsupportedOperationException(this.getClass().getName()
+ + " doesn't support checking to see if it should use a scanner!");
+ }
+
+
+ @Override
+ public boolean realSeekDone() {
+ return this.delegate.realSeekDone();
+ }
+
+ @Override
+ public void enforceSeek() throws IOException {
+ this.delegate.enforceSeek();
+ }
+
+ @Override
+ public void close() {
+ this.delegate.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/scanner/Scanner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/scanner/Scanner.java b/src/main/java/org/apache/hbase/index/scanner/Scanner.java
new file mode 100644
index 0000000..75c053e
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/scanner/Scanner.java
@@ -0,0 +1,37 @@
+package org.apache.hbase.index.scanner;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Scan the primary table. This is similar to HBase's scanner, but ensures that you will never see
+ * deleted columns/rows
+ */
+public interface Scanner extends Closeable {
+
+ /**
+ * @return the next keyvalue in the scanner or <tt>null</tt> if there is no next {@link KeyValue}
+ * @throws IOException if there is an underlying error reading the data
+ */
+ public KeyValue next() throws IOException;
+
+ /**
+ * Seek to immediately before the given {@link KeyValue}. If that exact {@link KeyValue} is
+ * present in <tt>this</tt>, it will be returned by the next call to {@link #next()}. Otherwise,
+ * returns the next {@link KeyValue} after the seeked {@link KeyValue}.
+ * @param next {@link KeyValue} to seek to. Doesn't need to already be present in <tt>this</tt>
+ * @return <tt>true</tt> if there are values left in <tt>this</tt>, <tt>false</tt> otherwise
+ * @throws IOException if there is an error reading the underlying data.
+ */
+ public boolean seek(KeyValue next) throws IOException;
+
+ /**
+ * Read the {@link KeyValue} at the top of <tt>this</tt> without 'popping' it off the top of the
+ * scanner.
+ * @return the next {@link KeyValue} or <tt>null</tt> if there are no more values in <tt>this</tt>
+ * @throws IOException if there is an error reading the underlying data.
+ */
+ public KeyValue peek() throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/scanner/ScannerBuilder.java b/src/main/java/org/apache/hbase/index/scanner/ScannerBuilder.java
new file mode 100644
index 0000000..c6448d6
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/scanner/ScannerBuilder.java
@@ -0,0 +1,147 @@
+package org.apache.hbase.index.scanner;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Lists;
+import org.apache.hbase.index.covered.KeyValueStore;
+import org.apache.hbase.index.covered.filter.ApplyAndFilterDeletesFilter;
+import org.apache.hbase.index.covered.filter.ColumnTrackingNextLargestTimestampFilter;
+import org.apache.hbase.index.covered.update.ColumnReference;
+import org.apache.hbase.index.covered.update.ColumnTracker;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ *
+ */
+public class ScannerBuilder {
+
+ private KeyValueStore memstore;
+ private Mutation update;
+
+
+ public ScannerBuilder(KeyValueStore memstore, Mutation update) {
+ this.memstore = memstore;
+ this.update = update;
+ }
+
+ public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts) {
+
+ Filter columnFilters = getColumnFilters(indexedColumns);
+ FilterList filters = new FilterList(Lists.newArrayList(columnFilters));
+
+ // skip to the right TS. This needs to come before the deletes since the deletes will hide any
+ // state that comes before the actual kvs, so we need to capture those TS as they change the row
+ // state.
+ filters.addFilter(new ColumnTrackingNextLargestTimestampFilter(ts, tracker));
+
+ // filter out kvs based on deletes
+ filters.addFilter(new ApplyAndFilterDeletesFilter(getAllFamilies(indexedColumns)));
+
+ // combine the family filters and the rest of the filters as a
+ return getFilteredScanner(filters);
+ }
+
+ /**
+ * @param columns columns to filter
+ * @return filter that will skip any {@link KeyValue} that doesn't match one of the passed columns
+ * and the
+ */
+ private Filter
+ getColumnFilters(Collection<? extends ColumnReference> columns) {
+ // each column needs to be added as an OR, so we need to separate them out
+ FilterList columnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+
+ // create a filter that matches each column reference
+ for (ColumnReference ref : columns) {
+ Filter columnFilter =
+ new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(ref.getFamily()));
+ // combine with a match for the qualifier, if the qualifier is a specific qualifier
+ if (!Bytes.equals(ColumnReference.ALL_QUALIFIERS, ref.getQualifier())) {
+ columnFilter =
+ new FilterList(columnFilter, new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(
+ ref.getQualifier())));
+ }
+ columnFilters.addFilter(columnFilter);
+ }
+ return columnFilters;
+ }
+
+ private Set<ImmutableBytesPtr>
+ getAllFamilies(Collection<? extends ColumnReference> columns) {
+ Set<ImmutableBytesPtr> families = new HashSet<ImmutableBytesPtr>();
+ for (ColumnReference ref : columns) {
+ families.add(new ImmutableBytesPtr(ref.getFamily()));
+ }
+ return families;
+ }
+
+ private Scanner getFilteredScanner(Filter filters) {
+ // create a scanner and wrap it as an iterator, meaning you can only go forward
+ final FilteredKeyValueScanner kvScanner = new FilteredKeyValueScanner(filters, memstore);
+ // seek the scanner to initialize it
+ KeyValue start = KeyValue.createFirstOnRow(update.getRow());
+ try {
+ if (!kvScanner.seek(start)) {
+ return new EmptyScanner();
+ }
+ } catch (IOException e) {
+ // This should never happen - everything should explode if so.
+ throw new RuntimeException(
+ "Failed to seek to first key from update on the memstore scanner!", e);
+ }
+
+ // we have some info in the scanner, so wrap it in an iterator and return.
+ return new Scanner() {
+
+ @Override
+ public KeyValue next() {
+ try {
+ return kvScanner.next();
+ } catch (IOException e) {
+ throw new RuntimeException("Error reading kvs from local memstore!");
+ }
+ }
+
+ @Override
+ public boolean seek(KeyValue next) throws IOException {
+ // check to see if the next kv is after the current key, in which case we can use reseek,
+ // which will be more efficient
+ KeyValue peek = kvScanner.peek();
+ // there is another value and its before the requested one - we can do a reseek!
+ if (peek != null) {
+ int compare = KeyValue.COMPARATOR.compare(peek, next);
+ if (compare < 0) {
+ return kvScanner.reseek(next);
+ } else if (compare == 0) {
+ // we are already at the given key!
+ return true;
+ }
+ }
+ return kvScanner.seek(next);
+ }
+
+ @Override
+ public KeyValue peek() throws IOException {
+ return kvScanner.peek();
+ }
+
+ @Override
+ public void close() {
+ kvScanner.close();
+ }
+ };
+ }
+}
\ No newline at end of file