You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mu...@apache.org on 2014/02/15 01:07:40 UTC
[07/15] Rename package from org.apache.hadoop.hbase.index.* to
org.apache.phoenix.index.* to fix classloader issue causing mutable index
performance regression - https://issues.apache.org/jira/browse/PHOENIX-38
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
new file mode 100644
index 0000000..a28268c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.scanner;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+
+/**
+ * {@link Scanner} that has no underlying data
+ */
+public class EmptyScanner implements Scanner {
+
+ @Override
+ public KeyValue next() throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean seek(KeyValue next) throws IOException {
+ return false;
+ }
+
+ @Override
+ public KeyValue peek() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
new file mode 100644
index 0000000..d91aaaf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.scanner;
+
+import java.io.IOException;
+import java.util.SortedSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+
+import org.apache.phoenix.hbase.index.covered.KeyValueStore;
+
+/**
+ * Combine a simplified version of the logic in the ScanQueryMatcher and the KeyValueScanner. We can get away with this
+ * here because we are only concerned with a single MemStore for the index; we don't need to worry about multiple column
+ * families or minimizing seeking through file - we just want to iterate the kvs quickly, in-memory.
+ */
+public class FilteredKeyValueScanner implements KeyValueScanner {
+
+ private KeyValueScanner delegate;
+ private Filter filter;
+
+ public FilteredKeyValueScanner(Filter filter, KeyValueStore store) {
+ this(filter, store.getScanner());
+ }
+
+ private FilteredKeyValueScanner(Filter filter, KeyValueScanner delegate) {
+ this.delegate = delegate;
+ this.filter = filter;
+ }
+
+ @Override
+ public KeyValue peek() {
+ return delegate.peek();
+ }
+
+ /**
+ * Same a {@link KeyValueScanner#next()} except that we filter out the next {@link KeyValue} until we find one that
+ * passes the filter.
+ *
+ * @return the next {@link KeyValue} or <tt>null</tt> if no next {@link KeyValue} is present and passes all the
+ * filters.
+ */
+ @Override
+ public KeyValue next() throws IOException {
+ seekToNextUnfilteredKeyValue();
+ return delegate.next();
+ }
+
+ @Override
+ public boolean seek(KeyValue key) throws IOException {
+ if (filter.filterAllRemaining()) { return false; }
+ // see if we can seek to the next key
+ if (!delegate.seek(key)) { return false; }
+
+ return seekToNextUnfilteredKeyValue();
+ }
+
+ private boolean seekToNextUnfilteredKeyValue() throws IOException {
+ while (true) {
+ KeyValue peeked = delegate.peek();
+ // no more key values, so we are done
+ if (peeked == null) { return false; }
+
+ // filter the peeked value to see if it should be served
+ ReturnCode code = filter.filterKeyValue(peeked);
+ switch (code) {
+ // included, so we are done
+ case INCLUDE:
+ case INCLUDE_AND_NEXT_COL:
+ return true;
+ // not included, so we need to go to the next row
+ case SKIP:
+ case NEXT_COL:
+ case NEXT_ROW:
+ delegate.next();
+ break;
+ // use a seek hint to find out where we should go
+ case SEEK_NEXT_USING_HINT:
+ delegate.seek(filter.getNextKeyHint(peeked));
+ }
+ }
+ }
+
+ @Override
+ public boolean reseek(KeyValue key) throws IOException {
+ this.delegate.reseek(key);
+ return this.seekToNextUnfilteredKeyValue();
+ }
+
+ @Override
+ public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom) throws IOException {
+ return this.reseek(kv);
+ }
+
+ @Override
+ public boolean isFileScanner() {
+ return false;
+ }
+
+ @Override
+ public long getSequenceID() {
+ return this.delegate.getSequenceID();
+ }
+
+ @Override
+ public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
+ throw new UnsupportedOperationException(this.getClass().getName()
+ + " doesn't support checking to see if it should use a scanner!");
+ }
+
+ @Override
+ public boolean realSeekDone() {
+ return this.delegate.realSeekDone();
+ }
+
+ @Override
+ public void enforceSeek() throws IOException {
+ this.delegate.enforceSeek();
+ }
+
+ @Override
+ public void close() {
+ this.delegate.close();
+ }
+
+ /*
+ @Override
+ public boolean backwardSeek(KeyValue arg0) throws IOException {
+ return this.delegate.backwardSeek(arg0);
+ }
+
+ @Override
+ public boolean seekToLastRow() throws IOException {
+ return this.delegate.seekToLastRow();
+ }
+
+ @Override
+ public boolean seekToPreviousRow(KeyValue arg0) throws IOException {
+ return this.delegate.seekToPreviousRow(arg0);
+ }
+ */
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
new file mode 100644
index 0000000..868e892
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.scanner;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Scan the primary table. This is similar to HBase's scanner, but ensures that you will never see
+ * deleted columns/rows
+ */
+public interface Scanner extends Closeable {
+
+ /**
+ * @return the next keyvalue in the scanner or <tt>null</tt> if there is no next {@link KeyValue}
+ * @throws IOException if there is an underlying error reading the data
+ */
+ public KeyValue next() throws IOException;
+
+ /**
+ * Seek to immediately before the given {@link KeyValue}. If that exact {@link KeyValue} is
+ * present in <tt>this</tt>, it will be returned by the next call to {@link #next()}. Otherwise,
+ * returns the next {@link KeyValue} after the seeked {@link KeyValue}.
+ * @param next {@link KeyValue} to seek to. Doesn't need to already be present in <tt>this</tt>
+ * @return <tt>true</tt> if there are values left in <tt>this</tt>, <tt>false</tt> otherwise
+ * @throws IOException if there is an error reading the underlying data.
+ */
+ public boolean seek(KeyValue next) throws IOException;
+
+ /**
+ * Read the {@link KeyValue} at the top of <tt>this</tt> without 'popping' it off the top of the
+ * scanner.
+ * @return the next {@link KeyValue} or <tt>null</tt> if there are no more values in <tt>this</tt>
+ * @throws IOException if there is an error reading the underlying data.
+ */
+ public KeyValue peek() throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
new file mode 100644
index 0000000..edc26d5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.scanner;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.hbase.index.covered.KeyValueStore;
+import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter;
+import org.apache.phoenix.hbase.index.covered.filter.ColumnTrackingNextLargestTimestampFilter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ *
+ */
+public class ScannerBuilder {
+
+ private KeyValueStore memstore;
+ private Mutation update;
+
+
+ public ScannerBuilder(KeyValueStore memstore, Mutation update) {
+ this.memstore = memstore;
+ this.update = update;
+ }
+
+ public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts) {
+
+ Filter columnFilters = getColumnFilters(indexedColumns);
+ FilterList filters = new FilterList(Lists.newArrayList(columnFilters));
+
+ // skip to the right TS. This needs to come before the deletes since the deletes will hide any
+ // state that comes before the actual kvs, so we need to capture those TS as they change the row
+ // state.
+ filters.addFilter(new ColumnTrackingNextLargestTimestampFilter(ts, tracker));
+
+ // filter out kvs based on deletes
+ filters.addFilter(new ApplyAndFilterDeletesFilter(getAllFamilies(indexedColumns)));
+
+ // combine the family filters and the rest of the filters as a
+ return getFilteredScanner(filters);
+ }
+
+ /**
+ * @param columns columns to filter
+ * @return filter that will skip any {@link KeyValue} that doesn't match one of the passed columns
+ * and the
+ */
+ private Filter
+ getColumnFilters(Collection<? extends ColumnReference> columns) {
+ // each column needs to be added as an OR, so we need to separate them out
+ FilterList columnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+
+ // create a filter that matches each column reference
+ for (ColumnReference ref : columns) {
+ Filter columnFilter =
+ new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(ref.getFamily()));
+ // combine with a match for the qualifier, if the qualifier is a specific qualifier
+ if (!Bytes.equals(ColumnReference.ALL_QUALIFIERS, ref.getQualifier())) {
+ columnFilter =
+ new FilterList(columnFilter, new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(
+ ref.getQualifier())));
+ }
+ columnFilters.addFilter(columnFilter);
+ }
+ return columnFilters;
+ }
+
+ private Set<ImmutableBytesPtr>
+ getAllFamilies(Collection<? extends ColumnReference> columns) {
+ Set<ImmutableBytesPtr> families = new HashSet<ImmutableBytesPtr>();
+ for (ColumnReference ref : columns) {
+ families.add(new ImmutableBytesPtr(ref.getFamily()));
+ }
+ return families;
+ }
+
+ private Scanner getFilteredScanner(Filter filters) {
+ // create a scanner and wrap it as an iterator, meaning you can only go forward
+ final FilteredKeyValueScanner kvScanner = new FilteredKeyValueScanner(filters, memstore);
+ // seek the scanner to initialize it
+ KeyValue start = KeyValue.createFirstOnRow(update.getRow());
+ try {
+ if (!kvScanner.seek(start)) {
+ return new EmptyScanner();
+ }
+ } catch (IOException e) {
+ // This should never happen - everything should explode if so.
+ throw new RuntimeException(
+ "Failed to seek to first key from update on the memstore scanner!", e);
+ }
+
+ // we have some info in the scanner, so wrap it in an iterator and return.
+ return new Scanner() {
+
+ @Override
+ public KeyValue next() {
+ try {
+ return kvScanner.next();
+ } catch (IOException e) {
+ throw new RuntimeException("Error reading kvs from local memstore!");
+ }
+ }
+
+ @Override
+ public boolean seek(KeyValue next) throws IOException {
+ // check to see if the next kv is after the current key, in which case we can use reseek,
+ // which will be more efficient
+ KeyValue peek = kvScanner.peek();
+ // there is another value and its before the requested one - we can do a reseek!
+ if (peek != null) {
+ int compare = KeyValue.COMPARATOR.compare(peek, next);
+ if (compare < 0) {
+ return kvScanner.reseek(next);
+ } else if (compare == 0) {
+ // we are already at the given key!
+ return true;
+ }
+ }
+ return kvScanner.seek(next);
+ }
+
+ @Override
+ public KeyValue peek() throws IOException {
+ return kvScanner.peek();
+ }
+
+ @Override
+ public void close() {
+ kvScanner.close();
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
new file mode 100644
index 0000000..0c06e2b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.table;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * A simple cache that just uses usual GC mechanisms to cleanup unused {@link HTableInterface}s.
+ * When requesting an {@link HTableInterface} via {@link #getTable}, you may get the same table as
+ * last time, or it may be a new table.
+ * <p>
+ * You <b>should not call {@link HTableInterface#close()} </b> that is handled when the table goes
+ * out of scope. Along the same lines, you must ensure to not keep a reference to the table for
+ * longer than necessary - this leak will ensure that the table never gets closed.
+ */
+public class CachingHTableFactory implements HTableFactory {
+
+ /**
+ * LRUMap that closes the {@link HTableInterface} when the table is evicted
+ */
+ @SuppressWarnings("serial")
+ public class HTableInterfaceLRUMap extends LRUMap {
+
+ public HTableInterfaceLRUMap(int cacheSize) {
+ super(cacheSize);
+ }
+
+ @Override
+ protected boolean removeLRU(LinkEntry entry) {
+ HTableInterface table = (HTableInterface) entry.getValue();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing connection to table: " + Bytes.toString(table.getTableName())
+ + " because it was evicted from the cache.");
+ }
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.info("Failed to correctly close HTable: " + Bytes.toString(table.getTableName())
+ + " ignoring since being removed from queue.");
+ }
+ return true;
+ }
+ }
+
+ public static int getCacheSize(Configuration conf) {
+ return conf.getInt(CACHE_SIZE_KEY, DEFAULT_CACHE_SIZE);
+ }
+
+ private static final Log LOG = LogFactory.getLog(CachingHTableFactory.class);
+ private static final String CACHE_SIZE_KEY = "index.tablefactory.cache.size";
+ private static final int DEFAULT_CACHE_SIZE = 10;
+
+ private HTableFactory delegate;
+
+ @SuppressWarnings("rawtypes")
+ Map openTables;
+
+ public CachingHTableFactory(HTableFactory tableFactory, Configuration conf) {
+ this(tableFactory, getCacheSize(conf));
+ }
+
+ public CachingHTableFactory(HTableFactory factory, int cacheSize) {
+ this.delegate = factory;
+ openTables = new HTableInterfaceLRUMap(cacheSize);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+ ImmutableBytesPtr tableBytes = new ImmutableBytesPtr(tablename);
+ synchronized (openTables) {
+ HTableInterface table = (HTableInterface) openTables.get(tableBytes);
+ if (table == null) {
+ table = delegate.getTable(tablename);
+ openTables.put(tableBytes, table);
+ }
+ return table;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ this.delegate.shutdown();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
new file mode 100644
index 0000000..33559da
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.table;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+public class CoprocessorHTableFactory implements HTableFactory {
+
+ /** Number of milliseconds per-interval to retry zookeeper */
+ private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL = "zookeeper.recovery.retry.intervalmill";
+ /** Number of retries for zookeeper */
+ private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry";
+ private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class);
+ private CoprocessorEnvironment e;
+
+ public CoprocessorHTableFactory(CoprocessorEnvironment e) {
+ this.e = e;
+ }
+
+ @Override
+ public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+ Configuration conf = e.getConfiguration();
+ // make sure writers fail fast
+ IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+ IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
+ IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
+ IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
+ IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000);
+ IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new HTable: " + Bytes.toString(tablename.copyBytesIfNecessary()));
+ }
+ return this.e.getTable(tablename.copyBytesIfNecessary());
+ }
+
+ @Override
+ public void shutdown() {
+ // noop
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
new file mode 100644
index 0000000..bef3d34
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.table;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+public interface HTableFactory {
+
+ public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException;
+
+ public void shutdown();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableInterfaceReference.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableInterfaceReference.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableInterfaceReference.java
new file mode 100644
index 0000000..b6d8d8e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableInterfaceReference.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.table;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Reference to an HTableInterface. Currently, its pretty simple in that it is just a wrapper around
+ * the table name.
+ */
+public class HTableInterfaceReference {
+
+ private ImmutableBytesPtr tableName;
+
+
+ public HTableInterfaceReference(ImmutableBytesPtr tableName) {
+ this.tableName = tableName;
+ }
+
+ public ImmutableBytesPtr get() {
+ return this.tableName;
+ }
+
+ public String getTableName() {
+ return Bytes.toString(this.tableName.get(),this.tableName.getOffset(), this.tableName.getLength());
+ }
+
+ @Override
+ public int hashCode() {
+ return tableName.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ HTableInterfaceReference other = (HTableInterfaceReference)obj;
+ return tableName.equals(other.tableName);
+ }
+
+ @Override
+ public String toString() {
+ return Bytes.toString(this.tableName.get());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
new file mode 100644
index 0000000..9825c77
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class ImmutableBytesPtr extends ImmutableBytesWritable {
+ private int hashCode;
+
+ public ImmutableBytesPtr() {
+ }
+
+ public ImmutableBytesPtr(byte[] bytes) {
+ super(bytes);
+ hashCode = super.hashCode();
+ }
+
+ public ImmutableBytesPtr(ImmutableBytesWritable ibw) {
+ super(ibw.get(), ibw.getOffset(), ibw.getLength());
+ hashCode = super.hashCode();
+ }
+
+ public ImmutableBytesPtr(ImmutableBytesPtr ibp) {
+ super(ibp.get(), ibp.getOffset(), ibp.getLength());
+ hashCode = ibp.hashCode;
+ }
+
+ public ImmutableBytesPtr(byte[] bytes, int offset, int length) {
+ super(bytes, offset, length);
+ hashCode = super.hashCode();
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ ImmutableBytesPtr that = (ImmutableBytesPtr)obj;
+ if (this.hashCode != that.hashCode) return false;
+ if (Bytes.compareTo(this.get(), this.getOffset(), this.getLength(), that.get(), that.getOffset(), that.getLength()) != 0) return false;
+ return true;
+ }
+
+ public void set(ImmutableBytesWritable ptr) {
+ set(ptr.get(),ptr.getOffset(),ptr.getLength());
+ }
+
+ /**
+ * @param b Use passed bytes as backing array for this instance.
+ */
+ @Override
+ public void set(final byte [] b) {
+ super.set(b);
+ hashCode = super.hashCode();
+ }
+
+ /**
+ * @param b Use passed bytes as backing array for this instance.
+ * @param offset
+ * @param length
+ */
+ @Override
+ public void set(final byte [] b, final int offset, final int length) {
+ super.set(b,offset,length);
+ hashCode = super.hashCode();
+ }
+
+ @Override
+ public void readFields(final DataInput in) throws IOException {
+ super.readFields(in);
+ hashCode = super.hashCode();
+ }
+
+ /**
+ * @return the backing byte array, copying only if necessary
+ */
+ public byte[] copyBytesIfNecessary() {
+ return copyBytesIfNecessary(this);
+ }
+
+ public static byte[] copyBytesIfNecessary(ImmutableBytesWritable ptr) {
+ if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
+ return ptr.get();
+ }
+ return ptr.copyBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
new file mode 100644
index 0000000..76ec9ce
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.util;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException;
+import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.scanner.Scanner;
+
+/**
+ * Utility class to help manage indexes
+ */
+public class IndexManagementUtil {
+
+ private IndexManagementUtil() {
+ // private ctor for util classes
+ }
+
+ // Don't rely on statically defined classes constants from classes that may not exist
+ // in earlier HBase versions
+ public static final String INDEX_WAL_EDIT_CODEC_CLASS_NAME = "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec";
+ public static final String HLOG_READER_IMPL_KEY = "hbase.regionserver.hlog.reader.impl";
+ public static final String WAL_EDIT_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
+
+ private static final String INDEX_HLOG_READER_CLASS_NAME = "org.apache.hadoop.hbase.regionserver.wal.IndexedHLogReader";
+ private static final Log LOG = LogFactory.getLog(IndexManagementUtil.class);
+
+ public static boolean isWALEditCodecSet(Configuration conf) {
+ // check to see if the WALEditCodec is installed
+ try {
+ // Use reflection to load the IndexedWALEditCodec, since it may not load with an older version
+ // of HBase
+ Class.forName(INDEX_WAL_EDIT_CODEC_CLASS_NAME);
+ } catch (Throwable t) {
+ return false;
+ }
+ if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf.get(WAL_EDIT_CODEC_CLASS_KEY, null))) {
+ // its installed, and it can handle compression and non-compression cases
+ return true;
+ }
+ return false;
+ }
+
+ public static void ensureMutableIndexingCorrectlyConfigured(Configuration conf) throws IllegalStateException {
+
+ // check to see if the WALEditCodec is installed
+ if (isWALEditCodecSet(conf)) { return; }
+
+ // otherwise, we have to install the indexedhlogreader, but it cannot have compression
+ String codecClass = INDEX_WAL_EDIT_CODEC_CLASS_NAME;
+ String indexLogReaderName = INDEX_HLOG_READER_CLASS_NAME;
+ try {
+ // Use reflection to load the IndexedHLogReader, since it may not load with an older version
+ // of HBase
+ Class.forName(indexLogReaderName);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(codecClass + " is not installed, but "
+ + indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
+ }
+ if (indexLogReaderName.equals(conf.get(HLOG_READER_IMPL_KEY, indexLogReaderName))) {
+ if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) { throw new IllegalStateException(
+ "WAL Compression is only supported with " + codecClass
+ + ". You can install in hbase-site.xml, under " + WAL_EDIT_CODEC_CLASS_KEY); }
+ } else {
+ throw new IllegalStateException(codecClass + " is not installed, but "
+ + indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
+ }
+
+ }
+
+ public static ValueGetter createGetterFromKeyValues(Collection<KeyValue> pendingUpdates) {
+ final Map<ReferencingColumn, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
+ .size());
+ for (KeyValue kv : pendingUpdates) {
+ // create new pointers to each part of the kv
+ ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength());
+ ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getBuffer(), kv.getQualifierOffset(),
+ kv.getQualifierLength());
+ ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+ valueMap.put(new ReferencingColumn(family, qual), value);
+ }
+ return new ValueGetter() {
+ @Override
+ public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException {
+ return valueMap.get(ReferencingColumn.wrap(ref));
+ }
+ };
+ }
+
+ private static class ReferencingColumn {
+ ImmutableBytesPtr family;
+ ImmutableBytesPtr qual;
+
+ static ReferencingColumn wrap(ColumnReference ref) {
+ ImmutableBytesPtr family = new ImmutableBytesPtr(ref.getFamily());
+ ImmutableBytesPtr qual = new ImmutableBytesPtr(ref.getQualifier());
+ return new ReferencingColumn(family, qual);
+ }
+
+ public ReferencingColumn(ImmutableBytesPtr family, ImmutableBytesPtr qual) {
+ this.family = family;
+ this.qual = qual;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((family == null) ? 0 : family.hashCode());
+ result = prime * result + ((qual == null) ? 0 : qual.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ ReferencingColumn other = (ReferencingColumn)obj;
+ if (family == null) {
+ if (other.family != null) return false;
+ } else if (!family.equals(other.family)) return false;
+ if (qual == null) {
+ if (other.qual != null) return false;
+ } else if (!qual.equals(other.qual)) return false;
+ return true;
+ }
+ }
+
+ public static ValueGetter createGetterFromScanner(Scanner scanner, byte[] currentRow) {
+ return new LazyValueGetter(scanner, currentRow);
+ }
+
+ /**
+ * check to see if the kvs in the update match any of the passed columns. Generally, this is useful to for an index
+ * codec to determine if a given update should even be indexed. This assumes that for any index, there are going to
+ * small number of columns, versus the number of kvs in any one batch.
+ */
+ public static boolean updateMatchesColumns(Collection<KeyValue> update, List<ColumnReference> columns) {
+ // check to see if the kvs in the new update even match any of the columns requested
+ // assuming that for any index, there are going to small number of columns, versus the number of
+ // kvs in any one batch.
+ boolean matches = false;
+ outer: for (KeyValue kv : update) {
+ for (ColumnReference ref : columns) {
+ if (ref.matchesFamily(kv.getFamily()) && ref.matchesQualifier(kv.getQualifier())) {
+ matches = true;
+ // if a single column matches a single kv, we need to build a whole scanner
+ break outer;
+ }
+ }
+ }
+ return matches;
+ }
+
+ /**
+ * Check to see if the kvs in the update match any of the passed columns. Generally, this is useful to for an index
+ * codec to determine if a given update should even be indexed. This assumes that for any index, there are going to
+ * small number of kvs, versus the number of columns in any one batch.
+ * <p>
+ * This employs the same logic as {@link #updateMatchesColumns(Collection, List)}, but is flips the iteration logic
+ * to search columns before kvs.
+ */
+ public static boolean columnMatchesUpdate(List<ColumnReference> columns, Collection<KeyValue> update) {
+ boolean matches = false;
+ outer: for (ColumnReference ref : columns) {
+ for (KeyValue kv : update) {
+ if (ref.matchesFamily(kv.getFamily()) && ref.matchesQualifier(kv.getQualifier())) {
+ matches = true;
+ // if a single column matches a single kv, we need to build a whole scanner
+ break outer;
+ }
+ }
+ }
+ return matches;
+ }
+
+ public static Scan newLocalStateScan(List<? extends Iterable<? extends ColumnReference>> refsArray) {
+ Scan s = new Scan();
+ s.setRaw(true);
+ // add the necessary columns to the scan
+ for (Iterable<? extends ColumnReference> refs : refsArray) {
+ for (ColumnReference ref : refs) {
+ s.addFamily(ref.getFamily());
+ }
+ }
+ s.setMaxVersions();
+ return s;
+ }
+
+ /**
+ * Propagate the given failure as a generic {@link IOException}, if it isn't already
+ *
+ * @param e
+ * reason indexing failed. If ,tt>null</tt>, throws a {@link NullPointerException}, which should unload
+ * the coprocessor.
+ */
+ public static void rethrowIndexingException(Throwable e) throws IOException {
+ try {
+ throw e;
+ } catch (IOException e1) {
+ LOG.info("Rethrowing " + e);
+ throw e1;
+ } catch (Throwable e1) {
+ LOG.info("Rethrowing " + e1 + " as a " + IndexBuildingFailureException.class.getSimpleName());
+ throw new IndexBuildingFailureException("Failed to build index for unexpected reason!", e1);
+ }
+ }
+
+ public static void setIfNotSet(Configuration conf, String key, int value) {
+ if (conf.get(key) == null) {
+ conf.setInt(key, value);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
new file mode 100644
index 0000000..253fd0d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.wal;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+public class IndexedKeyValue extends KeyValue {
+ private static int calcHashCode(ImmutableBytesPtr indexTableName, Mutation mutation) {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + indexTableName.hashCode();
+ result = prime * result + Arrays.hashCode(mutation.getRow());
+ return result;
+ }
+
+ private ImmutableBytesPtr indexTableName;
+ private Mutation mutation;
+ // optimization check to ensure that batches don't get replayed to the index more than once
+ private boolean batchFinished = false;
+ private int hashCode;
+
+ public IndexedKeyValue() {}
+
+ public IndexedKeyValue(byte[] bs, Mutation mutation) {
+ this.indexTableName = new ImmutableBytesPtr(bs);
+ this.mutation = mutation;
+ this.hashCode = calcHashCode(indexTableName, mutation);
+ }
+
+ public byte[] getIndexTable() {
+ return this.indexTableName.get();
+ }
+
+ public Mutation getMutation() {
+ return mutation;
+ }
+
+ /**
+ * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an {@link HLog#METAFAMILY} so it
+ * isn't replayed via the normal replay mechanism
+ */
+ @Override
+ public boolean matchingFamily(final byte[] family) {
+ return Bytes.equals(family, HLog.METAFAMILY);
+ }
+
+ @Override
+ public String toString() {
+ return "IndexWrite:\n\ttable: " + indexTableName + "\n\tmutation:" + mutation;
+ }
+
+ /**
+ * This is a very heavy-weight operation and should only be done when absolutely necessary - it does a full
+ * serialization of the underyling mutation to compare the underlying data.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if(obj == null) return false;
+ if (this == obj) return true;
+ if (getClass() != obj.getClass()) return false;
+ IndexedKeyValue other = (IndexedKeyValue)obj;
+ if (hashCode() != other.hashCode()) return false;
+ if (!other.indexTableName.equals(this.indexTableName)) return false;
+ byte[] current = this.getMutationBytes();
+ byte[] otherMutation = other.getMutationBytes();
+ return Bytes.equals(current, otherMutation);
+ }
+
+ private byte[] getMutationBytes() {
+ ByteArrayOutputStream bos = null;
+ try {
+ bos = new ByteArrayOutputStream();
+ this.mutation.write(new DataOutputStream(bos));
+ bos.flush();
+ return bos.toByteArray();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
+ } finally {
+ if (bos != null) {
+ try {
+ bos.close();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ KeyValueCodec.write(out, this);
+ }
+
+ /**
+ * Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done
+ * via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of
+ * {@link IndexedKeyValue}s.
+ *
+ * @param out
+ * to write data to. Does not close or flush the passed object.
+ * @throws IOException
+ * if there is a problem writing the underlying data
+ */
+ void writeData(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, this.indexTableName.get());
+ out.writeUTF(this.mutation.getClass().getName());
+ this.mutation.write(out);
+ }
+
+ /**
+ * This method shouldn't be used - you should use {@link KeyValueCodec#readKeyValue(DataInput)} instead. Its the
+ * complement to {@link #writeData(DataOutput)}.
+ */
+ @SuppressWarnings("javadoc")
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in));
+ Class<? extends Mutation> clazz;
+ try {
+ clazz = Class.forName(in.readUTF()).asSubclass(Mutation.class);
+ this.mutation = clazz.newInstance();
+ this.mutation.readFields(in);
+ this.hashCode = calcHashCode(indexTableName, mutation);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public boolean getBatchFinished() {
+ return this.batchFinished;
+ }
+
+ public void markBatchFinished() {
+ this.batchFinished = true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java
new file mode 100644
index 0000000..3815937
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Codec to encode/decode {@link KeyValue}s and {@link IndexedKeyValue}s within a {@link WALEdit}
+ */
+public class KeyValueCodec {
+
+ /**
+ * KeyValue length marker specifying that its actually an {@link IndexedKeyValue} rather than a
+ * regular {@link KeyValue}.
+ */
+ public static final int INDEX_TYPE_LENGTH_MARKER = -1;
+
+ /**
+ * Read a {@link List} of {@link KeyValue} from the input stream - may contain regular
+ * {@link KeyValue}s or {@link IndexedKeyValue}s.
+ * @param in to read from
+ * @return the next {@link KeyValue}s
+ * @throws IOException if the next {@link KeyValue} cannot be read
+ */
+ public static List<KeyValue> readKeyValues(DataInput in) throws IOException {
+ int size = in.readInt();
+ if (size == 0) {
+ return Collections.<KeyValue>emptyList();
+ }
+ List<KeyValue> kvs = new ArrayList<KeyValue>(size);
+ for (int i = 0; i < size; i++) {
+ kvs.add(readKeyValue(in));
+ }
+ return kvs;
+ }
+
+ /**
+ * Read a single {@link KeyValue} from the input stream - may either be a regular {@link KeyValue}
+ * or an {@link IndexedKeyValue}.
+ * @param in to read from
+ * @return the next {@link KeyValue}, if one is available
+ * @throws IOException if the next {@link KeyValue} cannot be read
+ */
+ public static KeyValue readKeyValue(DataInput in) throws IOException {
+ int length = in.readInt();
+ KeyValue kv;
+ // its a special IndexedKeyValue
+ if (length == INDEX_TYPE_LENGTH_MARKER) {
+ kv = new IndexedKeyValue();
+ kv.readFields(in);
+ } else {
+ kv = new KeyValue();
+ kv.readFields(length, in);
+ }
+ return kv;
+ }
+
+ /**
+ * Write a {@link KeyValue} or an {@link IndexedKeyValue} to the output stream. These can be read
+ * back via {@link #readKeyValue(DataInput)} or {@link #readKeyValues(DataInput)}.
+ * @param out to write to
+ * @param kv {@link KeyValue} to which to write
+ * @throws IOException if there is an error writing
+ */
+ public static void write(DataOutput out, KeyValue kv) throws IOException {
+ if (kv instanceof IndexedKeyValue) {
+ out.writeInt(INDEX_TYPE_LENGTH_MARKER);
+ ((IndexedKeyValue) kv).writeData(out);
+ } else {
+ kv.write(out);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
new file mode 100644
index 0000000..d7fef5e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.phoenix.hbase.index.exception.IndexWriteException;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Write the index updates to the index tables
+ */
+public interface IndexCommitter extends Stoppable {
+
+ void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
+
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+ throws IndexWriteException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
new file mode 100644
index 0000000..5964647
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Handle failures to write to the index tables.
+ */
+public interface IndexFailurePolicy extends Stoppable {
+
+ public void setup(Stoppable parent, RegionCoprocessorEnvironment env);
+
+ /**
+ * Handle the failure of the attempted index updates
+ * @param attempted map of index table -> mutations to apply
+ * @param cause reason why there was a failure
+ * @throws IOException
+ */
+ public void
+ handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
new file mode 100644
index 0000000..30797b2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.phoenix.hbase.index.exception.IndexWriteException;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the
+ * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed.
+ * <p>
+ * We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon
+ * threads, so it will not block the region from shutting down.
+ */
+public class IndexWriter implements Stoppable {
+
+ private static final Log LOG = LogFactory.getLog(IndexWriter.class);
+ private static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class";
+ public static final String INDEX_FAILURE_POLICY_CONF_KEY = "index.writer.failurepolicy.class";
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+ private IndexCommitter writer;
+ private IndexFailurePolicy failurePolicy;
+
+ /**
+ * @throws IOException if the {@link IndexWriter} or {@link IndexFailurePolicy} cannot be
+ * instantiated
+ */
+ public IndexWriter(RegionCoprocessorEnvironment env, String name) throws IOException {
+ this(getCommitter(env), getFailurePolicy(env), env, name);
+ }
+
+ public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
+ Configuration conf = env.getConfiguration();
+ try {
+ IndexCommitter committer =
+ conf.getClass(INDEX_COMMITTER_CONF_KEY, ParallelWriterIndexCommitter.class,
+ IndexCommitter.class).newInstance();
+ return committer;
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static IndexFailurePolicy getFailurePolicy(RegionCoprocessorEnvironment env)
+ throws IOException {
+ Configuration conf = env.getConfiguration();
+ try {
+ IndexFailurePolicy committer =
+ conf.getClass(INDEX_FAILURE_POLICY_CONF_KEY, KillServerOnFailurePolicy.class,
+ IndexFailurePolicy.class).newInstance();
+ return committer;
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected
+ * to be fully setup before calling.
+ * @param committer
+ * @param policy
+ * @param env
+ */
+ public IndexWriter(IndexCommitter committer, IndexFailurePolicy policy,
+ RegionCoprocessorEnvironment env, String name) {
+ this(committer, policy);
+ this.writer.setup(this, env, name);
+ this.failurePolicy.setup(this, env);
+ }
+
+ /**
+ * Create an {@link IndexWriter} with an already setup {@link IndexCommitter} and
+ * {@link IndexFailurePolicy}.
+ * @param committer to write updates
+ * @param policy to handle failures
+ */
+ IndexWriter(IndexCommitter committer, IndexFailurePolicy policy) {
+ this.writer = committer;
+ this.failurePolicy = policy;
+ }
+
+ /**
+ * Write the mutations to their respective table.
+ * <p>
+ * This method is blocking and could potentially cause the writer to block for a long time as we
+ * write the index updates. When we return depends on the specified {@link IndexCommitter}.
+ * <p>
+ * If update fails, we pass along the failure to the installed {@link IndexFailurePolicy}, which
+ * then decides how to handle the failure. By default, we use a {@link KillServerOnFailurePolicy},
+ * which ensures that the server crashes when an index write fails, ensuring that we get WAL
+ * replay of the index edits.
+ * @param indexUpdates Updates to write
+ * @throws IOException
+ */
+ public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException {
+ // convert the strings to htableinterfaces to which we can talk and group by TABLE
+ Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
+ writeAndKillYourselfOnFailure(toWrite);
+ }
+
+ /**
+ * see {@link #writeAndKillYourselfOnFailure(Collection)}.
+ * @param toWrite
+ * @throws IOException
+ */
+ public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite) throws IOException {
+ try {
+ write(toWrite);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Done writing all index updates!\n\t" + toWrite);
+ }
+ } catch (Exception e) {
+ this.failurePolicy.handleFailure(toWrite, e);
+ }
+ }
+
+ /**
+ * Write the mutations to their respective table.
+ * <p>
+ * This method is blocking and could potentially cause the writer to block for a long time as we
+ * write the index updates. We only return when either:
+ * <ol>
+ * <li>All index writes have returned, OR</li>
+ * <li>Any single index write has failed</li>
+ * </ol>
+ * We attempt to quickly determine if any write has failed and not write to the remaining indexes
+ * to ensure a timely recovery of the failed index writes.
+ * @param toWrite Updates to write
+ * @throws IndexWriteException if we cannot successfully write to the index. Whether or not we
+ * stop early depends on the {@link IndexCommitter}.
+ */
+ public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
+ write(resolveTableReferences(toWrite));
+ }
+
+ /**
+ * see {@link #write(Collection)}
+ * @param toWrite
+ * @throws IndexWriteException
+ */
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+ throws IndexWriteException {
+ this.writer.write(toWrite);
+ }
+
+
+ /**
+ * Convert the passed index updates to {@link HTableInterfaceReference}s.
+ * @param indexUpdates from the index builder
+ * @return pairs that can then be written by an {@link IndexWriter}.
+ */
+ public static Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
+ Collection<Pair<Mutation, byte[]>> indexUpdates) {
+ Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap
+ .<HTableInterfaceReference, Mutation> create();
+ // simple map to make lookups easy while we build the map of tables to create
+ Map<ImmutableBytesPtr, HTableInterfaceReference> tables =
+ new HashMap<ImmutableBytesPtr, HTableInterfaceReference>(updates.size());
+ for (Pair<Mutation, byte[]> entry : indexUpdates) {
+ byte[] tableName = entry.getSecond();
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr(tableName);
+ HTableInterfaceReference table = tables.get(ptr);
+ if (table == null) {
+ table = new HTableInterfaceReference(ptr);
+ tables.put(ptr, table);
+ }
+ updates.put(table, entry.getFirst());
+ }
+
+ return updates;
+ }
+
+ @Override
+ public void stop(String why) {
+ if (!this.stopped.compareAndSet(false, true)) {
+ // already stopped
+ return;
+ }
+ LOG.debug("Stopping because " + why);
+ this.writer.stop(why);
+ this.failurePolicy.stop(why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped.get();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
new file mode 100644
index 0000000..db95970
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+
+import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+public class IndexWriterUtils {
+
+ private static final Log LOG = LogFactory.getLog(IndexWriterUtils.class);
+
+ /**
+ * Maximum number of threads to allow per-table when writing. Each writer thread (from
+ * {@link IndexWriterUtils#NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY}) has a single HTable.
+ * However, each table is backed by a threadpool to manage the updates to that table. this
+ * specifies the number of threads to allow in each of those tables. Generally, you shouldn't need
+ * to change this, unless you have a small number of indexes to which most of the writes go.
+ * Defaults to: {@value #DEFAULT_NUM_PER_TABLE_THREADS}.
+ * <p>
+ * For tables to which there are not a lot of writes, the thread pool automatically will decrease
+ * the number of threads to one (though it can burst up to the specified max for any given table),
+ * so increasing this to meet the max case is reasonable.
+ * <p>
+ * Setting this value too small can cause <b>catastrophic cluster failure</b>. The way HTable's
+ * underlying pool works is such that is does direct hand-off of tasks to threads. This works fine
+ * because HTables are assumed to work in a single-threaded context, so we never get more threads
+ * than regionservers. In a multi-threaded context, we can easily grow to more than that number of
+ * threads. Currently, HBase doesn't support a custom thread-pool to back the HTable via the
+ * coprocesor hooks, so we can't modify this behavior.
+ */
+ private static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY =
+ "index.writer.threads.pertable.max";
+ private static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE;
+
+ /** Configuration key that HBase uses to set the max number of threads for an HTable */
+ public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max";
+ private IndexWriterUtils() {
+ // private ctor for utilites
+ }
+
+ public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) {
+ // create a simple delegate factory, setup the way we need
+ Configuration conf = env.getConfiguration();
+ // set the number of threads allowed per table.
+ int htableThreads =
+ conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
+ LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
+ IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
+ return new CoprocessorHTableFactory(env);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java
new file mode 100644
index 0000000..0b84cdf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Naive failure policy - kills the server on which it resides
+ */
+public class KillServerOnFailurePolicy implements IndexFailurePolicy {
+
+ private static final Log LOG = LogFactory.getLog(KillServerOnFailurePolicy.class);
+ private Abortable abortable;
+ private Stoppable stoppable;
+
+ @Override
+ public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+ setup(parent, env.getRegionServerServices());
+ }
+
+ public void setup(Stoppable parent, Abortable abort) {
+ this.stoppable = parent;
+ this.abortable = abort;
+ }
+
+ @Override
+ public void stop(String why) {
+ // noop
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stoppable.isStopped();
+ }
+
+ @Override
+ public void
+ handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
+ // cleanup resources
+ this.stop("Killing ourselves because of an error:" + cause);
+ // notify the regionserver of the failure
+ String msg =
+ "Could not update the index table, killing server region because couldn't write to an index table";
+ LOG.error(msg, cause);
+ try {
+ this.abortable.abort(msg, cause);
+ } catch (Exception e) {
+ LOG.fatal("Couldn't abort this server to preserve index writes, "
+ + "attempting to hard kill the server");
+ System.exit(1);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
new file mode 100644
index 0000000..55695ff
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
+import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.table.CachingHTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Write index updates to the index tables in parallel. We attempt to early exit from the writes if
+ * any of the index updates fails. Completion is determined by the following criteria: *
+ * <ol>
+ * <li>All index writes have returned, OR</li>
+ * <li>Any single index write has failed</li>
+ * </ol>
+ * We attempt to quickly determine if any write has failed and not write to the remaining indexes to
+ * ensure a timely recovery of the failed index writes.
+ */
+public class ParallelWriterIndexCommitter implements IndexCommitter {
+
+ public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
+ private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+ private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
+ "index.writer.threads.keepalivetime";
+ private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
+
+ private HTableFactory factory;
+ private Stoppable stopped;
+ private QuickFailingTaskRunner pool;
+
+ @Override
+ public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+ Configuration conf = env.getConfiguration();
+ setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+ ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder(name, conf).
+ setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
+ setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
+ env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
+ }
+
+ /**
+ * Setup <tt>this</tt>.
+ * <p>
+ * Exposed for TESTING
+ */
+ void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+ int cacheSize) {
+ this.factory = new CachingHTableFactory(factory, cacheSize);
+ this.pool = new QuickFailingTaskRunner(pool);
+ this.stopped = stop;
+ }
+
+ @Override
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+ throws SingleIndexWriteFailureException {
+ /*
+ * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the
+ * writes in parallel to each index table, so each table gets its own task and is submitted to
+ * the pool. Where it gets tricky is that we want to block the calling thread until one of two
+ * things happens: (1) all index tables get successfully updated, or (2) any one of the index
+ * table writes fail; in either case, we should return as quickly as possible. We get a little
+ * more complicated in that if we do get a single failure, but any of the index writes hasn't
+ * been started yet (its been queued up, but not submitted to a thread) we want to that task to
+ * fail immediately as we know that write is a waste and will need to be replayed anyways.
+ */
+
+ Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+ TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
+ for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+ // get the mutations for each table. We leak the implementation here a little bit to save
+ // doing a complete copy over of all the index update for each table.
+ final List<Mutation> mutations = (List<Mutation>) entry.getValue();
+ final HTableInterfaceReference tableReference = entry.getKey();
+ /*
+ * Write a batch of index updates to an index table. This operation stops (is cancelable) via
+ * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
+ * running thread. The former will only work if we are not in the midst of writing the current
+ * batch to the table, though we do check these status variables before starting and before
+ * writing the batch. The latter usage, interrupting the thread, will work in the previous
+ * situations as was at some points while writing the batch, depending on the underlying
+ * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
+ * supports an interrupt).
+ */
+ tasks.add(new Task<Void>() {
+
+ /**
+ * Do the actual write to the primary table. We don't need to worry about closing the table
+ * because that is handled the {@link CachingHTableFactory}.
+ */
+ @Override
+ public Void call() throws Exception {
+ // this may have been queued, so another task infront of us may have failed, so we should
+ // early exit, if that's the case
+ throwFailureIfDone();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+ }
+ try {
+ HTableInterface table = factory.getTable(tableReference.get());
+ throwFailureIfDone();
+ table.batch(mutations);
+ } catch (SingleIndexWriteFailureException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+ } catch (InterruptedException e) {
+ // reset the interrupt status on the thread
+ Thread.currentThread().interrupt();
+ throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+ }
+ return null;
+ }
+
+ private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+ if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) {
+ throw new SingleIndexWriteFailureException(
+ "Pool closed, not attempting to write to the index!", null);
+ }
+
+ }
+ });
+ }
+
+ // actually submit the tasks to the pool and wait for them to finish/fail
+ try {
+ pool.submitUninterruptible(tasks);
+ } catch (EarlyExitFailure e) {
+ propagateFailure(e);
+ } catch (ExecutionException e) {
+ LOG.error("Found a failed index update!");
+ propagateFailure(e.getCause());
+ }
+
+ }
+
+ private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
+ try {
+ throw throwable;
+ } catch (SingleIndexWriteFailureException e1) {
+ throw e1;
+ } catch (Throwable e1) {
+ throw new SingleIndexWriteFailureException(
+ "Got an abort notification while writing to the index!", e1);
+ }
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed
+ * by the external {@link Stoppable}. This call does not delegate the stop down to the
+ * {@link Stoppable} passed in the constructor.
+ * @param why the reason for stopping
+ */
+ @Override
+ public void stop(String why) {
+ LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
+ this.pool.stop(why);
+ this.factory.shutdown();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped.isStopped();
+ }
+}
\ No newline at end of file