You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:44 UTC
[43/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/table/CachingHTableFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/table/CachingHTableFactory.java b/src/main/java/org/apache/hbase/index/table/CachingHTableFactory.java
new file mode 100644
index 0000000..acb78be
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/table/CachingHTableFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.table;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * A simple cache that just uses usual GC mechanisms to cleanup unused {@link HTableInterface}s.
+ * When requesting an {@link HTableInterface} via {@link #getTable}, you may get the same table as
+ * last time, or it may be a new table.
+ * <p>
+ * You <b>should not call {@link HTableInterface#close()} </b> that is handled when the table goes
+ * out of scope. Along the same lines, you must ensure to not keep a reference to the table for
+ * longer than necessary - this leak will ensure that the table never gets closed.
+ */
+public class CachingHTableFactory implements HTableFactory {
+
+ /**
+ * LRUMap that closes the {@link HTableInterface} when the table is evicted
+ */
+ @SuppressWarnings("serial")
+ public class HTableInterfaceLRUMap extends LRUMap {
+
+ public HTableInterfaceLRUMap(int cacheSize) {
+ super(cacheSize);
+ }
+
+ @Override
+ protected boolean removeLRU(LinkEntry entry) {
+ HTableInterface table = (HTableInterface) entry.getValue();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing connection to table: " + Bytes.toString(table.getTableName())
+ + " because it was evicted from the cache.");
+ }
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.info("Failed to correctly close HTable: " + Bytes.toString(table.getTableName())
+ + " ignoring since being removed from queue.");
+ }
+ return true;
+ }
+ }
+
+ public static int getCacheSize(Configuration conf) {
+ return conf.getInt(CACHE_SIZE_KEY, DEFAULT_CACHE_SIZE);
+ }
+
+ private static final Log LOG = LogFactory.getLog(CachingHTableFactory.class);
+ private static final String CACHE_SIZE_KEY = "index.tablefactory.cache.size";
+ private static final int DEFAULT_CACHE_SIZE = 10;
+
+ private HTableFactory delegate;
+
+ @SuppressWarnings("rawtypes")
+ Map openTables;
+
+ public CachingHTableFactory(HTableFactory tableFactory, Configuration conf) {
+ this(tableFactory, getCacheSize(conf));
+ }
+
+ public CachingHTableFactory(HTableFactory factory, int cacheSize) {
+ this.delegate = factory;
+ openTables = new HTableInterfaceLRUMap(cacheSize);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+ ImmutableBytesPtr tableBytes = new ImmutableBytesPtr(tablename);
+ synchronized (openTables) {
+ HTableInterface table = (HTableInterface) openTables.get(tableBytes);
+ if (table == null) {
+ table = delegate.getTable(tablename);
+ openTables.put(tableBytes, table);
+ }
+ return table;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ this.delegate.shutdown();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/table/CoprocessorHTableFactory.java b/src/main/java/org/apache/hbase/index/table/CoprocessorHTableFactory.java
new file mode 100644
index 0000000..e773105
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/table/CoprocessorHTableFactory.java
@@ -0,0 +1,50 @@
+package org.apache.hbase.index.table;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.hbase.index.util.IndexManagementUtil;
+
+public class CoprocessorHTableFactory implements HTableFactory {
+
+ /** Number of milliseconds per-interval to retry zookeeper */
+ private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL = "zookeeper.recovery.retry.intervalmill";
+ /** Number of retries for zookeeper */
+ private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry";
+ private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class);
+ private CoprocessorEnvironment e;
+
+ public CoprocessorHTableFactory(CoprocessorEnvironment e) {
+ this.e = e;
+ }
+
+ @Override
+ public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+ Configuration conf = e.getConfiguration();
+ // make sure writers fail fast
+ IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+ IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
+ IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
+ IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
+ IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000);
+ IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new HTable: " + Bytes.toString(tablename.copyBytesIfNecessary()));
+ }
+ return this.e.getTable(tablename.copyBytesIfNecessary());
+ }
+
+ @Override
+ public void shutdown() {
+ // noop
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/table/HTableFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/table/HTableFactory.java b/src/main/java/org/apache/hbase/index/table/HTableFactory.java
new file mode 100644
index 0000000..f9e524f
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/table/HTableFactory.java
@@ -0,0 +1,14 @@
+package org.apache.hbase.index.table;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+public interface HTableFactory {
+
+ public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException;
+
+ public void shutdown();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/table/HTableInterfaceReference.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/table/HTableInterfaceReference.java b/src/main/java/org/apache/hbase/index/table/HTableInterfaceReference.java
new file mode 100644
index 0000000..6854003
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/table/HTableInterfaceReference.java
@@ -0,0 +1,46 @@
+package org.apache.hbase.index.table;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Reference to an HTableInterface. Currently, its pretty simple in that it is just a wrapper around
+ * the table name.
+ */
+public class HTableInterfaceReference {
+
+ private ImmutableBytesPtr tableName;
+
+
+ public HTableInterfaceReference(ImmutableBytesPtr tableName) {
+ this.tableName = tableName;
+ }
+
+ public ImmutableBytesPtr get() {
+ return this.tableName;
+ }
+
+ public String getTableName() {
+ return Bytes.toString(this.tableName.get(),this.tableName.getOffset(), this.tableName.getLength());
+ }
+
+ @Override
+ public int hashCode() {
+ return tableName.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ HTableInterfaceReference other = (HTableInterfaceReference)obj;
+ return tableName.equals(other.tableName);
+ }
+
+ @Override
+ public String toString() {
+ return Bytes.toString(this.tableName.get());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/util/ImmutableBytesPtr.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/util/ImmutableBytesPtr.java b/src/main/java/org/apache/hbase/index/util/ImmutableBytesPtr.java
new file mode 100644
index 0000000..3cb2609
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/util/ImmutableBytesPtr.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.util;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class ImmutableBytesPtr extends ImmutableBytesWritable {
+ private int hashCode;
+
+ public ImmutableBytesPtr() {
+ }
+
+ public ImmutableBytesPtr(byte[] bytes) {
+ super(bytes);
+ hashCode = super.hashCode();
+ }
+
+ public ImmutableBytesPtr(ImmutableBytesWritable ibw) {
+ super(ibw);
+ hashCode = super.hashCode();
+ }
+
+ public ImmutableBytesPtr(byte[] bytes, int offset, int length) {
+ super(bytes, offset, length);
+ hashCode = super.hashCode();
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ ImmutableBytesPtr that = (ImmutableBytesPtr)obj;
+ if (this.hashCode != that.hashCode) return false;
+ if (Bytes.compareTo(this.get(), this.getOffset(), this.getLength(), that.get(), that.getOffset(), that.getLength()) != 0) return false;
+ return true;
+ }
+
+ public void set(ImmutableBytesWritable ptr) {
+ set(ptr.get(),ptr.getOffset(),ptr.getLength());
+ }
+
+ /**
+ * @param b Use passed bytes as backing array for this instance.
+ */
+ @Override
+ public void set(final byte [] b) {
+ super.set(b);
+ hashCode = super.hashCode();
+ }
+
+ /**
+ * @param b Use passed bytes as backing array for this instance.
+ * @param offset
+ * @param length
+ */
+ @Override
+ public void set(final byte [] b, final int offset, final int length) {
+ super.set(b,offset,length);
+ hashCode = super.hashCode();
+ }
+
+ /**
+ * @return the backing byte array, copying only if necessary
+ */
+ public byte[] copyBytesIfNecessary() {
+ if (this.getOffset() == 0 && this.getLength() == this.get().length) {
+ return this.get();
+ }
+ return this.copyBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/util/IndexManagementUtil.java b/src/main/java/org/apache/hbase/index/util/IndexManagementUtil.java
new file mode 100644
index 0000000..f4acb6f
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/util/IndexManagementUtil.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.util;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+
+import com.google.common.collect.Maps;
+import org.apache.hbase.index.ValueGetter;
+import org.apache.hbase.index.builder.IndexBuildingFailureException;
+import org.apache.hbase.index.covered.data.LazyValueGetter;
+import org.apache.hbase.index.covered.update.ColumnReference;
+import org.apache.hbase.index.scanner.Scanner;
+
+/**
+ * Utility class to help manage indexes
+ */
+public class IndexManagementUtil {
+
+ private IndexManagementUtil() {
+ // private ctor for util classes
+ }
+
+ // Don't rely on statically defined classes constants from classes that may not exist
+ // in earlier HBase versions
+ public static final String INDEX_WAL_EDIT_CODEC_CLASS_NAME = "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec";
+ public static final String HLOG_READER_IMPL_KEY = "hbase.regionserver.hlog.reader.impl";
+ public static final String WAL_EDIT_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
+
+ private static final String INDEX_HLOG_READER_CLASS_NAME = "org.apache.hadoop.hbase.regionserver.wal.IndexedHLogReader";
+ private static final Log LOG = LogFactory.getLog(IndexManagementUtil.class);
+
+ public static boolean isWALEditCodecSet(Configuration conf) {
+ // check to see if the WALEditCodec is installed
+ try {
+ // Use reflection to load the IndexedWALEditCodec, since it may not load with an older version
+ // of HBase
+ Class.forName(INDEX_WAL_EDIT_CODEC_CLASS_NAME);
+ } catch (Throwable t) {
+ return false;
+ }
+ if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf.get(WAL_EDIT_CODEC_CLASS_KEY, null))) {
+ // its installed, and it can handle compression and non-compression cases
+ return true;
+ }
+ return false;
+ }
+
+ public static void ensureMutableIndexingCorrectlyConfigured(Configuration conf) throws IllegalStateException {
+
+ // check to see if the WALEditCodec is installed
+ if (isWALEditCodecSet(conf)) { return; }
+
+ // otherwise, we have to install the indexedhlogreader, but it cannot have compression
+ String codecClass = INDEX_WAL_EDIT_CODEC_CLASS_NAME;
+ String indexLogReaderName = INDEX_HLOG_READER_CLASS_NAME;
+ try {
+ // Use reflection to load the IndexedHLogReader, since it may not load with an older version
+ // of HBase
+ Class.forName(indexLogReaderName);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(codecClass + " is not installed, but "
+ + indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
+ }
+ if (indexLogReaderName.equals(conf.get(HLOG_READER_IMPL_KEY, indexLogReaderName))) {
+ if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) { throw new IllegalStateException(
+ "WAL Compression is only supported with " + codecClass
+ + ". You can install in hbase-site.xml, under " + WAL_EDIT_CODEC_CLASS_KEY); }
+ } else {
+ throw new IllegalStateException(codecClass + " is not installed, but "
+ + indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
+ }
+
+ }
+
+ public static ValueGetter createGetterFromKeyValues(Collection<KeyValue> pendingUpdates) {
+ final Map<ReferencingColumn, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
+ .size());
+ for (KeyValue kv : pendingUpdates) {
+ // create new pointers to each part of the kv
+ ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength());
+ ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getBuffer(), kv.getQualifierOffset(),
+ kv.getQualifierLength());
+ ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+ valueMap.put(new ReferencingColumn(family, qual), value);
+ }
+ return new ValueGetter() {
+ @Override
+ public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException {
+ return valueMap.get(ReferencingColumn.wrap(ref));
+ }
+ };
+ }
+
+ private static class ReferencingColumn {
+ ImmutableBytesPtr family;
+ ImmutableBytesPtr qual;
+
+ static ReferencingColumn wrap(ColumnReference ref) {
+ ImmutableBytesPtr family = new ImmutableBytesPtr(ref.getFamily());
+ ImmutableBytesPtr qual = new ImmutableBytesPtr(ref.getQualifier());
+ return new ReferencingColumn(family, qual);
+ }
+
+ public ReferencingColumn(ImmutableBytesPtr family, ImmutableBytesPtr qual) {
+ this.family = family;
+ this.qual = qual;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((family == null) ? 0 : family.hashCode());
+ result = prime * result + ((qual == null) ? 0 : qual.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ ReferencingColumn other = (ReferencingColumn)obj;
+ if (family == null) {
+ if (other.family != null) return false;
+ } else if (!family.equals(other.family)) return false;
+ if (qual == null) {
+ if (other.qual != null) return false;
+ } else if (!qual.equals(other.qual)) return false;
+ return true;
+ }
+ }
+
+ public static ValueGetter createGetterFromScanner(Scanner scanner, byte[] currentRow) {
+ return new LazyValueGetter(scanner, currentRow);
+ }
+
+ /**
+ * check to see if the kvs in the update match any of the passed columns. Generally, this is useful to for an index
+ * codec to determine if a given update should even be indexed. This assumes that for any index, there are going to
+ * small number of columns, versus the number of kvs in any one batch.
+ */
+ public static boolean updateMatchesColumns(Collection<KeyValue> update, List<ColumnReference> columns) {
+ // check to see if the kvs in the new update even match any of the columns requested
+ // assuming that for any index, there are going to small number of columns, versus the number of
+ // kvs in any one batch.
+ boolean matches = false;
+ outer: for (KeyValue kv : update) {
+ for (ColumnReference ref : columns) {
+ if (ref.matchesFamily(kv.getFamily()) && ref.matchesQualifier(kv.getQualifier())) {
+ matches = true;
+ // if a single column matches a single kv, we need to build a whole scanner
+ break outer;
+ }
+ }
+ }
+ return matches;
+ }
+
+ /**
+ * Check to see if the kvs in the update match any of the passed columns. Generally, this is useful to for an index
+ * codec to determine if a given update should even be indexed. This assumes that for any index, there are going to
+ * small number of kvs, versus the number of columns in any one batch.
+ * <p>
+ * This employs the same logic as {@link #updateMatchesColumns(Collection, List)}, but is flips the iteration logic
+ * to search columns before kvs.
+ */
+ public static boolean columnMatchesUpdate(List<ColumnReference> columns, Collection<KeyValue> update) {
+ boolean matches = false;
+ outer: for (ColumnReference ref : columns) {
+ for (KeyValue kv : update) {
+ if (ref.matchesFamily(kv.getFamily()) && ref.matchesQualifier(kv.getQualifier())) {
+ matches = true;
+ // if a single column matches a single kv, we need to build a whole scanner
+ break outer;
+ }
+ }
+ }
+ return matches;
+ }
+
+ public static Scan newLocalStateScan(List<? extends Iterable<? extends ColumnReference>> refsArray) {
+ Scan s = new Scan();
+ s.setRaw(true);
+ // add the necessary columns to the scan
+ for (Iterable<? extends ColumnReference> refs : refsArray) {
+ for (ColumnReference ref : refs) {
+ s.addFamily(ref.getFamily());
+ }
+ }
+ s.setMaxVersions();
+ return s;
+ }
+
+ /**
+ * Propagate the given failure as a generic {@link IOException}, if it isn't already
+ *
+ * @param e
+ * reason indexing failed. If ,tt>null</tt>, throws a {@link NullPointerException}, which should unload
+ * the coprocessor.
+ */
+ public static void rethrowIndexingException(Throwable e) throws IOException {
+ try {
+ throw e;
+ } catch (IOException e1) {
+ LOG.info("Rethrowing " + e);
+ throw e1;
+ } catch (Throwable e1) {
+ LOG.info("Rethrowing " + e1 + " as a " + IndexBuildingFailureException.class.getSimpleName());
+ throw new IndexBuildingFailureException("Failed to build index for unexpected reason!", e1);
+ }
+ }
+
+ public static void setIfNotSet(Configuration conf, String key, int value) {
+ if (conf.get(key) == null) {
+ conf.setInt(key, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/wal/IndexedKeyValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/wal/IndexedKeyValue.java b/src/main/java/org/apache/hbase/index/wal/IndexedKeyValue.java
new file mode 100644
index 0000000..ca46fce
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/wal/IndexedKeyValue.java
@@ -0,0 +1,155 @@
+package org.apache.hbase.index.wal;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+public class IndexedKeyValue extends KeyValue {
+ private static int calcHashCode(ImmutableBytesPtr indexTableName, Mutation mutation) {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + indexTableName.hashCode();
+ result = prime * result + Arrays.hashCode(mutation.getRow());
+ return result;
+ }
+
+ private ImmutableBytesPtr indexTableName;
+ private Mutation mutation;
+ // optimization check to ensure that batches don't get replayed to the index more than once
+ private boolean batchFinished = false;
+ private int hashCode;
+
+ public IndexedKeyValue() {}
+
+ public IndexedKeyValue(byte[] bs, Mutation mutation) {
+ this.indexTableName = new ImmutableBytesPtr(bs);
+ this.mutation = mutation;
+ this.hashCode = calcHashCode(indexTableName, mutation);
+ }
+
+ public byte[] getIndexTable() {
+ return this.indexTableName.get();
+ }
+
+ public Mutation getMutation() {
+ return mutation;
+ }
+
+ /**
+ * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an {@link HLog#METAFAMILY} so it
+ * isn't replayed via the normal replay mechanism
+ */
+ @Override
+ public boolean matchingFamily(final byte[] family) {
+ return Bytes.equals(family, HLog.METAFAMILY);
+ }
+
+ @Override
+ public String toString() {
+ return "IndexWrite:\n\ttable: " + indexTableName + "\n\tmutation:" + mutation;
+ }
+
+ /**
+ * This is a very heavy-weight operation and should only be done when absolutely necessary - it does a full
+ * serialization of the underyling mutation to compare the underlying data.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if(obj == null) return false;
+ if (this == obj) return true;
+ if (getClass() != obj.getClass()) return false;
+ IndexedKeyValue other = (IndexedKeyValue)obj;
+ if (hashCode() != other.hashCode()) return false;
+ if (!other.indexTableName.equals(this.indexTableName)) return false;
+ byte[] current = this.getMutationBytes();
+ byte[] otherMutation = other.getMutationBytes();
+ return Bytes.equals(current, otherMutation);
+ }
+
+ private byte[] getMutationBytes() {
+ ByteArrayOutputStream bos = null;
+ try {
+ bos = new ByteArrayOutputStream();
+ this.mutation.write(new DataOutputStream(bos));
+ bos.flush();
+ return bos.toByteArray();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
+ } finally {
+ if (bos != null) {
+ try {
+ bos.close();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ KeyValueCodec.write(out, this);
+ }
+
+ /**
+ * Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done
+ * via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of
+ * {@link IndexedKeyValue}s.
+ *
+ * @param out
+ * to write data to. Does not close or flush the passed object.
+ * @throws IOException
+ * if there is a problem writing the underlying data
+ */
+ void writeData(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, this.indexTableName.get());
+ out.writeUTF(this.mutation.getClass().getName());
+ this.mutation.write(out);
+ }
+
+ /**
+ * This method shouldn't be used - you should use {@link KeyValueCodec#readKeyValue(DataInput)} instead. Its the
+ * complement to {@link #writeData(DataOutput)}.
+ */
+ @SuppressWarnings("javadoc")
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in));
+ Class<? extends Mutation> clazz;
+ try {
+ clazz = Class.forName(in.readUTF()).asSubclass(Mutation.class);
+ this.mutation = clazz.newInstance();
+ this.mutation.readFields(in);
+ this.hashCode = calcHashCode(indexTableName, mutation);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public boolean getBatchFinished() {
+ return this.batchFinished;
+ }
+
+ public void markBatchFinished() {
+ this.batchFinished = true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/wal/KeyValueCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/wal/KeyValueCodec.java b/src/main/java/org/apache/hbase/index/wal/KeyValueCodec.java
new file mode 100644
index 0000000..2cdb181
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/wal/KeyValueCodec.java
@@ -0,0 +1,79 @@
+package org.apache.hbase.index.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Codec to encode/decode {@link KeyValue}s and {@link IndexedKeyValue}s within a {@link WALEdit}
+ */
+public class KeyValueCodec {
+
+ /**
+ * KeyValue length marker specifying that its actually an {@link IndexedKeyValue} rather than a
+ * regular {@link KeyValue}.
+ */
+ public static final int INDEX_TYPE_LENGTH_MARKER = -1;
+
+ /**
+ * Read a {@link List} of {@link KeyValue} from the input stream - may contain regular
+ * {@link KeyValue}s or {@link IndexedKeyValue}s.
+ * @param in to read from
+ * @return the next {@link KeyValue}s
+ * @throws IOException if the next {@link KeyValue} cannot be read
+ */
+ public static List<KeyValue> readKeyValues(DataInput in) throws IOException {
+ int size = in.readInt();
+ if (size == 0) {
+ return Collections.<KeyValue>emptyList();
+ }
+ List<KeyValue> kvs = new ArrayList<KeyValue>(size);
+ for (int i = 0; i < size; i++) {
+ kvs.add(readKeyValue(in));
+ }
+ return kvs;
+ }
+
+ /**
+ * Read a single {@link KeyValue} from the input stream - may either be a regular {@link KeyValue}
+ * or an {@link IndexedKeyValue}.
+ * @param in to read from
+ * @return the next {@link KeyValue}, if one is available
+ * @throws IOException if the next {@link KeyValue} cannot be read
+ */
+ public static KeyValue readKeyValue(DataInput in) throws IOException {
+ int length = in.readInt();
+ KeyValue kv;
+ // its a special IndexedKeyValue
+ if (length == INDEX_TYPE_LENGTH_MARKER) {
+ kv = new IndexedKeyValue();
+ kv.readFields(in);
+ } else {
+ kv = new KeyValue();
+ kv.readFields(length, in);
+ }
+ return kv;
+ }
+
+ /**
+ * Write a {@link KeyValue} or an {@link IndexedKeyValue} to the output stream. These can be read
+ * back via {@link #readKeyValue(DataInput)} or {@link #readKeyValues(DataInput)}.
+ * @param out to write to
+ * @param kv {@link KeyValue} to which to write
+ * @throws IOException if there is an error writing
+ */
+ public static void write(DataOutput out, KeyValue kv) throws IOException {
+ if (kv instanceof IndexedKeyValue) {
+ out.writeInt(INDEX_TYPE_LENGTH_MARKER);
+ ((IndexedKeyValue) kv).writeData(out);
+ } else {
+ kv.write(out);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/IndexCommitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/IndexCommitter.java b/src/main/java/org/apache/hbase/index/write/IndexCommitter.java
new file mode 100644
index 0000000..4f2386e
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/IndexCommitter.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.exception.IndexWriteException;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Write the index updates to the index tables
+ */
+public interface IndexCommitter extends Stoppable {
+
+ void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
+
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+ throws IndexWriteException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/IndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/IndexFailurePolicy.java b/src/main/java/org/apache/hbase/index/write/IndexFailurePolicy.java
new file mode 100644
index 0000000..c2d4b2c
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/IndexFailurePolicy.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Handle failures to write to the index tables.
+ */
+public interface IndexFailurePolicy extends Stoppable {
+
+ public void setup(Stoppable parent, RegionCoprocessorEnvironment env);
+
+ /**
+ * Handle the failure of the attempted index updates
+ * @param attempted map of index table -> mutations to apply
+ * @param cause reason why there was a failure
+ * @throws IOException
+ */
+ public void
+ handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/IndexWriter.java b/src/main/java/org/apache/hbase/index/write/IndexWriter.java
new file mode 100644
index 0000000..53a9fd9
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/IndexWriter.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.exception.IndexWriteException;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the
+ * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed.
+ * <p>
+ * We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon
+ * threads, so it will not block the region from shutting down.
+ */
+public class IndexWriter implements Stoppable {
+
+ private static final Log LOG = LogFactory.getLog(IndexWriter.class);
+ private static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class";
+ public static final String INDEX_FAILURE_POLICY_CONF_KEY = "index.writer.failurepolicy.class";
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+ private IndexCommitter writer;
+ private IndexFailurePolicy failurePolicy;
+
+ /**
+ * @throws IOException if the {@link IndexWriter} or {@link IndexFailurePolicy} cannot be
+ * instantiated
+ */
+ public IndexWriter(RegionCoprocessorEnvironment env, String name) throws IOException {
+ this(getCommitter(env), getFailurePolicy(env), env, name);
+ }
+
+ public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
+ Configuration conf = env.getConfiguration();
+ try {
+ IndexCommitter committer =
+ conf.getClass(INDEX_COMMITTER_CONF_KEY, ParallelWriterIndexCommitter.class,
+ IndexCommitter.class).newInstance();
+ return committer;
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static IndexFailurePolicy getFailurePolicy(RegionCoprocessorEnvironment env)
+ throws IOException {
+ Configuration conf = env.getConfiguration();
+ try {
+ IndexFailurePolicy committer =
+ conf.getClass(INDEX_FAILURE_POLICY_CONF_KEY, KillServerOnFailurePolicy.class,
+ IndexFailurePolicy.class).newInstance();
+ return committer;
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected
+ * to be fully setup before calling.
+ * @param committer
+ * @param policy
+ * @param env
+ */
+ public IndexWriter(IndexCommitter committer, IndexFailurePolicy policy,
+ RegionCoprocessorEnvironment env, String name) {
+ this(committer, policy);
+ this.writer.setup(this, env, name);
+ this.failurePolicy.setup(this, env);
+ }
+
+ /**
+ * Create an {@link IndexWriter} with an already setup {@link IndexCommitter} and
+ * {@link IndexFailurePolicy}.
+ * @param committer to write updates
+ * @param policy to handle failures
+ */
+ IndexWriter(IndexCommitter committer, IndexFailurePolicy policy) {
+ this.writer = committer;
+ this.failurePolicy = policy;
+ }
+
+ /**
+ * Write the mutations to their respective table.
+ * <p>
+ * This method is blocking and could potentially cause the writer to block for a long time as we
+ * write the index updates. When we return depends on the specified {@link IndexCommitter}.
+ * <p>
+ * If update fails, we pass along the failure to the installed {@link IndexFailurePolicy}, which
+ * then decides how to handle the failure. By default, we use a {@link KillServerOnFailurePolicy},
+ * which ensures that the server crashes when an index write fails, ensuring that we get WAL
+ * replay of the index edits.
+ * @param indexUpdates Updates to write
+ * @throws IOException
+ */
+ public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException {
+ // convert the strings to htableinterfaces to which we can talk and group by TABLE
+ Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
+ writeAndKillYourselfOnFailure(toWrite);
+ }
+
+ /**
+ * see {@link #writeAndKillYourselfOnFailure(Collection)}.
+ * @param toWrite
+ * @throws IOException
+ */
+ public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite) throws IOException {
+ try {
+ write(toWrite);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Done writing all index updates!\n\t" + toWrite);
+ }
+ } catch (Exception e) {
+ this.failurePolicy.handleFailure(toWrite, e);
+ }
+ }
+
+ /**
+ * Write the mutations to their respective table.
+ * <p>
+ * This method is blocking and could potentially cause the writer to block for a long time as we
+ * write the index updates. We only return when either:
+ * <ol>
+ * <li>All index writes have returned, OR</li>
+ * <li>Any single index write has failed</li>
+ * </ol>
+ * We attempt to quickly determine if any write has failed and not write to the remaining indexes
+ * to ensure a timely recovery of the failed index writes.
+ * @param toWrite Updates to write
+ * @throws IndexWriteException if we cannot successfully write to the index. Whether or not we
+ * stop early depends on the {@link IndexCommitter}.
+ */
+ public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
+ write(resolveTableReferences(toWrite));
+ }
+
+ /**
+ * see {@link #write(Collection)}
+ * @param toWrite
+ * @throws IndexWriteException
+ */
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+ throws IndexWriteException {
+ this.writer.write(toWrite);
+ }
+
+
+ /**
+ * Convert the passed index updates to {@link HTableInterfaceReference}s.
+ * @param indexUpdates from the index builder
+ * @return pairs that can then be written by an {@link IndexWriter}.
+ */
+ public static Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
+ Collection<Pair<Mutation, byte[]>> indexUpdates) {
+ Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap
+ .<HTableInterfaceReference, Mutation> create();
+ // simple map to make lookups easy while we build the map of tables to create
+ Map<ImmutableBytesPtr, HTableInterfaceReference> tables =
+ new HashMap<ImmutableBytesPtr, HTableInterfaceReference>(updates.size());
+ for (Pair<Mutation, byte[]> entry : indexUpdates) {
+ byte[] tableName = entry.getSecond();
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr(tableName);
+ HTableInterfaceReference table = tables.get(ptr);
+ if (table == null) {
+ table = new HTableInterfaceReference(ptr);
+ tables.put(ptr, table);
+ }
+ updates.put(table, entry.getFirst());
+ }
+
+ return updates;
+ }
+
+ @Override
+ public void stop(String why) {
+ if (!this.stopped.compareAndSet(false, true)) {
+ // already stopped
+ return;
+ }
+ LOG.debug("Stopping because " + why);
+ this.writer.stop(why);
+ this.failurePolicy.stop(why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped.get();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/IndexWriterUtils.java b/src/main/java/org/apache/hbase/index/write/IndexWriterUtils.java
new file mode 100644
index 0000000..5ec62db
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/IndexWriterUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+
+import org.apache.hbase.index.table.CoprocessorHTableFactory;
+import org.apache.hbase.index.table.HTableFactory;
+import org.apache.hbase.index.util.IndexManagementUtil;
+
+public class IndexWriterUtils {
+
+ private static final Log LOG = LogFactory.getLog(IndexWriterUtils.class);
+
+ /**
+ * Maximum number of threads to allow per-table when writing. Each writer thread (from
+ * {@link IndexWriterUtils#NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY}) has a single HTable.
+ * However, each table is backed by a threadpool to manage the updates to that table. this
+ * specifies the number of threads to allow in each of those tables. Generally, you shouldn't need
+ * to change this, unless you have a small number of indexes to which most of the writes go.
+ * Defaults to: {@value #DEFAULT_NUM_PER_TABLE_THREADS}.
+ * <p>
+ * For tables to which there are not a lot of writes, the thread pool automatically will decrease
+ * the number of threads to one (though it can burst up to the specified max for any given table),
+ * so increasing this to meet the max case is reasonable.
+ * <p>
+ * Setting this value too small can cause <b>catastrophic cluster failure</b>. The way HTable's
+ * underlying pool works is such that is does direct hand-off of tasks to threads. This works fine
+ * because HTables are assumed to work in a single-threaded context, so we never get more threads
+ * than regionservers. In a multi-threaded context, we can easily grow to more than that number of
+ * threads. Currently, HBase doesn't support a custom thread-pool to back the HTable via the
+ * coprocesor hooks, so we can't modify this behavior.
+ */
+ private static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY =
+ "index.writer.threads.pertable.max";
+ private static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE;
+
+ /** Configuration key that HBase uses to set the max number of threads for an HTable */
+ public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max";
+ private IndexWriterUtils() {
+ // private ctor for utilites
+ }
+
+ public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) {
+ // create a simple delegate factory, setup the way we need
+ Configuration conf = env.getConfiguration();
+ // set the number of threads allowed per table.
+ int htableThreads =
+ conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
+ LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
+ IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
+ return new CoprocessorHTableFactory(env);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/KillServerOnFailurePolicy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/KillServerOnFailurePolicy.java b/src/main/java/org/apache/hbase/index/write/KillServerOnFailurePolicy.java
new file mode 100644
index 0000000..ad66f17
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/KillServerOnFailurePolicy.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Naive failure policy - kills the server on which it resides
+ */
+public class KillServerOnFailurePolicy implements IndexFailurePolicy {
+
+ private static final Log LOG = LogFactory.getLog(KillServerOnFailurePolicy.class);
+ private Abortable abortable;
+ private Stoppable stoppable;
+
+ @Override
+ public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+ setup(parent, env.getRegionServerServices());
+ }
+
+ public void setup(Stoppable parent, Abortable abort) {
+ this.stoppable = parent;
+ this.abortable = abort;
+ }
+
+ @Override
+ public void stop(String why) {
+ // noop
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stoppable.isStopped();
+ }
+
+ @Override
+ public void
+ handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
+ // cleanup resources
+ this.stop("Killing ourselves because of an error:" + cause);
+ // notify the regionserver of the failure
+ String msg =
+ "Could not update the index table, killing server region because couldn't write to an index table";
+ LOG.error(msg, cause);
+ try {
+ this.abortable.abort(msg, cause);
+ } catch (Exception e) {
+ LOG.fatal("Couldn't abort this server to preserve index writes, "
+ + "attempting to hard kill the server");
+ System.exit(1);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/ParallelWriterIndexCommitter.java b/src/main/java/org/apache/hbase/index/write/ParallelWriterIndexCommitter.java
new file mode 100644
index 0000000..b85aa94
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.hbase.index.parallel.EarlyExitFailure;
+import org.apache.hbase.index.parallel.QuickFailingTaskRunner;
+import org.apache.hbase.index.parallel.Task;
+import org.apache.hbase.index.parallel.TaskBatch;
+import org.apache.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.hbase.index.parallel.ThreadPoolManager;
+import org.apache.hbase.index.table.CachingHTableFactory;
+import org.apache.hbase.index.table.HTableFactory;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Write index updates to the index tables in parallel. We attempt to early exit from the writes if
+ * any of the index updates fails. Completion is determined by the following criteria: *
+ * <ol>
+ * <li>All index writes have returned, OR</li>
+ * <li>Any single index write has failed</li>
+ * </ol>
+ * We attempt to quickly determine if any write has failed and not write to the remaining indexes to
+ * ensure a timely recovery of the failed index writes.
+ */
+public class ParallelWriterIndexCommitter implements IndexCommitter {
+
+ public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
+ private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+ private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
+ "index.writer.threads.keepalivetime";
+ private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
+
+ private HTableFactory factory;
+ private Stoppable stopped;
+ private QuickFailingTaskRunner pool;
+
+ @Override
+ public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+ Configuration conf = env.getConfiguration();
+ setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+ ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder(name, conf).
+ setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
+ setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
+ env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
+ }
+
+ /**
+ * Setup <tt>this</tt>.
+ * <p>
+ * Exposed for TESTING
+ */
+ void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+ int cacheSize) {
+ this.factory = new CachingHTableFactory(factory, cacheSize);
+ this.pool = new QuickFailingTaskRunner(pool);
+ this.stopped = stop;
+ }
+
+ @Override
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+ throws SingleIndexWriteFailureException {
+ /*
+ * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the
+ * writes in parallel to each index table, so each table gets its own task and is submitted to
+ * the pool. Where it gets tricky is that we want to block the calling thread until one of two
+ * things happens: (1) all index tables get successfully updated, or (2) any one of the index
+ * table writes fail; in either case, we should return as quickly as possible. We get a little
+ * more complicated in that if we do get a single failure, but any of the index writes hasn't
+ * been started yet (its been queued up, but not submitted to a thread) we want to that task to
+ * fail immediately as we know that write is a waste and will need to be replayed anyways.
+ */
+
+ Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+ TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
+ for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+ // get the mutations for each table. We leak the implementation here a little bit to save
+ // doing a complete copy over of all the index update for each table.
+ final List<Mutation> mutations = (List<Mutation>) entry.getValue();
+ final HTableInterfaceReference tableReference = entry.getKey();
+ /*
+ * Write a batch of index updates to an index table. This operation stops (is cancelable) via
+ * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
+ * running thread. The former will only work if we are not in the midst of writing the current
+ * batch to the table, though we do check these status variables before starting and before
+ * writing the batch. The latter usage, interrupting the thread, will work in the previous
+ * situations as was at some points while writing the batch, depending on the underlying
+ * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
+ * supports an interrupt).
+ */
+ tasks.add(new Task<Void>() {
+
+ /**
+ * Do the actual write to the primary table. We don't need to worry about closing the table
+ * because that is handled the {@link CachingHTableFactory}.
+ */
+ @Override
+ public Void call() throws Exception {
+ // this may have been queued, so another task infront of us may have failed, so we should
+ // early exit, if that's the case
+ throwFailureIfDone();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+ }
+ try {
+ HTableInterface table = factory.getTable(tableReference.get());
+ throwFailureIfDone();
+ table.batch(mutations);
+ } catch (SingleIndexWriteFailureException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+ } catch (InterruptedException e) {
+ // reset the interrupt status on the thread
+ Thread.currentThread().interrupt();
+ throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+ }
+ return null;
+ }
+
+ private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+ if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) {
+ throw new SingleIndexWriteFailureException(
+ "Pool closed, not attempting to write to the index!", null);
+ }
+
+ }
+ });
+ }
+
+ // actually submit the tasks to the pool and wait for them to finish/fail
+ try {
+ pool.submitUninterruptible(tasks);
+ } catch (EarlyExitFailure e) {
+ propagateFailure(e);
+ } catch (ExecutionException e) {
+ LOG.error("Found a failed index update!");
+ propagateFailure(e.getCause());
+ }
+
+ }
+
+ private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
+ try {
+ throw throwable;
+ } catch (SingleIndexWriteFailureException e1) {
+ throw e1;
+ } catch (Throwable e1) {
+ throw new SingleIndexWriteFailureException(
+ "Got an abort notification while writing to the index!", e1);
+ }
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed
+ * by the external {@link Stoppable}. This call does not delegate the stop down to the
+ * {@link Stoppable} passed in the constructor.
+ * @param why the reason for stopping
+ */
+ @Override
+ public void stop(String why) {
+ LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
+ this.pool.stop(why);
+ this.factory.shutdown();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped.isStopped();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/recovery/PerRegionIndexWriteCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/recovery/PerRegionIndexWriteCache.java b/src/main/java/org/apache/hbase/index/write/recovery/PerRegionIndexWriteCache.java
new file mode 100644
index 0000000..99be157
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/recovery/PerRegionIndexWriteCache.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write.recovery;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+
+public class PerRegionIndexWriteCache {
+
+ private Map<HRegion, Multimap<HTableInterfaceReference, Mutation>> cache =
+ new HashMap<HRegion, Multimap<HTableInterfaceReference, Mutation>>();
+
+
+ /**
+ * Get the edits for the current region. Removes the edits from the cache. To add them back, call
+ * {@link #addEdits(HRegion, HTableInterfaceReference, Collection)}.
+ * @param region
+ * @return Get the edits for the given region. Returns <tt>null</tt> if there are no pending edits
+ * for the region
+ */
+ public Multimap<HTableInterfaceReference, Mutation> getEdits(HRegion region) {
+ return cache.remove(region);
+ }
+
+ /**
+ * @param region
+ * @param table
+ * @param collection
+ */
+ public void addEdits(HRegion region, HTableInterfaceReference table,
+ Collection<Mutation> collection) {
+ Multimap<HTableInterfaceReference, Mutation> edits = cache.get(region);
+ if (edits == null) {
+ edits = ArrayListMultimap.<HTableInterfaceReference, Mutation> create();
+ cache.put(region, edits);
+ }
+ edits.putAll(table, collection);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/recovery/StoreFailuresInCachePolicy.java b/src/main/java/org/apache/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
new file mode 100644
index 0000000..642ff37
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write.recovery;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.exception.MultiIndexWriteFailureException;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+import org.apache.hbase.index.write.IndexFailurePolicy;
+import org.apache.hbase.index.write.KillServerOnFailurePolicy;
+
+/**
+ * Tracks any failed writes in The {@link PerRegionIndexWriteCache}, given a
+ * {@link MultiIndexWriteFailureException} (which is thrown from the
+ * {@link TrackingParallelWriterIndexCommitter}. Any other exception failure causes the a server
+ * abort via the usual {@link KillServerOnFailurePolicy}.
+ */
+public class StoreFailuresInCachePolicy implements IndexFailurePolicy {
+
+ private KillServerOnFailurePolicy delegate;
+ private PerRegionIndexWriteCache cache;
+ private HRegion region;
+
+ /**
+ * @param failedIndexEdits cache to update when we find a failure
+ */
+ public StoreFailuresInCachePolicy(PerRegionIndexWriteCache failedIndexEdits) {
+ this.cache = failedIndexEdits;
+ }
+
+ @Override
+ public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+ this.region = env.getRegion();
+ this.delegate = new KillServerOnFailurePolicy();
+ this.delegate.setup(parent, env);
+
+ }
+
+ @Override
+ public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
+ // if its not an exception we can handle, let the delegate take care of it
+ if (!(cause instanceof MultiIndexWriteFailureException)) {
+ delegate.handleFailure(attempted, cause);
+ }
+ List<HTableInterfaceReference> failedTables =
+ ((MultiIndexWriteFailureException) cause).getFailedTables();
+ for (HTableInterfaceReference table : failedTables) {
+ cache.addEdits(this.region, table, attempted.get(table));
+ }
+ }
+
+
+ @Override
+ public void stop(String why) {
+ this.delegate.stop(why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.delegate.isStopped();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/src/main/java/org/apache/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
new file mode 100644
index 0000000..1d4f02d
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write.recovery;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.CapturingAbortable;
+import org.apache.hbase.index.exception.MultiIndexWriteFailureException;
+import org.apache.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.hbase.index.parallel.EarlyExitFailure;
+import org.apache.hbase.index.parallel.Task;
+import org.apache.hbase.index.parallel.TaskBatch;
+import org.apache.hbase.index.parallel.TaskRunner;
+import org.apache.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.hbase.index.parallel.ThreadPoolManager;
+import org.apache.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.hbase.index.table.CachingHTableFactory;
+import org.apache.hbase.index.table.HTableFactory;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+import org.apache.hbase.index.write.IndexCommitter;
+import org.apache.hbase.index.write.IndexWriter;
+import org.apache.hbase.index.write.IndexWriterUtils;
+import org.apache.hbase.index.write.ParallelWriterIndexCommitter;
+
+/**
+ * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to
+ * allow the caller to retrieve the failed and succeeded index updates. Therefore, this class will
+ * be a lot slower, in the face of failures, when compared to the
+ * {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
+ * you need to at least attempt all writes and know their result; for instance, this is fine for
+ * doing WAL recovery - it's not a performance intensive situation and we want to limit the the
+ * edits we need to retry.
+ * <p>
+ * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that
+ * contains the list of {@link HTableInterfaceReference} that didn't complete successfully.
+ * <p>
+ * Failures to write to the index can happen several different ways:
+ * <ol>
+ * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}.
+ * This causing any pending tasks to fail whatever they are doing as fast as possible. Any writes
+ * that have not begun are not even attempted and marked as failures.</li>
+ * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index
+ * table is not available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase
+ * exceptions.</li>
+ * </ol>
+ * Regardless of how the write fails, we still wait for all writes to complete before passing the
+ * failure back to the client.
+ */
+public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
+ private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
+
+ public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
+ private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+ private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
+ "index.trackingwriter.threads.keepalivetime";
+
+ private TaskRunner pool;
+ private HTableFactory factory;
+ private CapturingAbortable abortable;
+ private Stoppable stopped;
+
+ @Override
+ public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+ Configuration conf = env.getConfiguration();
+ setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+ ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder(name, conf).
+ setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
+ setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
+ env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
+ }
+
+ /**
+ * Setup <tt>this</tt>.
+ * <p>
+ * Exposed for TESTING
+ */
+ void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+ int cacheSize) {
+ this.pool = new WaitForCompletionTaskRunner(pool);
+ this.factory = new CachingHTableFactory(factory, cacheSize);
+ this.abortable = new CapturingAbortable(abortable);
+ this.stopped = stop;
+ }
+
+ @Override
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+ throws MultiIndexWriteFailureException {
+ Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+ TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
+ List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
+ for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+ // get the mutations for each table. We leak the implementation here a little bit to save
+ // doing a complete copy over of all the index update for each table.
+ final List<Mutation> mutations = (List<Mutation>) entry.getValue();
+ // track each reference so we can get at it easily later, when determing failures
+ final HTableInterfaceReference tableReference = entry.getKey();
+ tables.add(tableReference);
+
+ /*
+ * Write a batch of index updates to an index table. This operation stops (is cancelable) via
+ * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
+ * running thread. The former will only work if we are not in the midst of writing the current
+ * batch to the table, though we do check these status variables before starting and before
+ * writing the batch. The latter usage, interrupting the thread, will work in the previous
+ * situations as was at some points while writing the batch, depending on the underlying
+ * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
+ * supports an interrupt).
+ */
+ tasks.add(new Task<Boolean>() {
+
+ /**
+ * Do the actual write to the primary table. We don't need to worry about closing the table
+ * because that is handled the {@link CachingHTableFactory}.
+ */
+ @Override
+ public Boolean call() throws Exception {
+ try {
+ // this may have been queued, but there was an abort/stop so we try to early exit
+ throwFailureIfDone();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+ }
+ HTableInterface table = factory.getTable(tableReference.get());
+ throwFailureIfDone();
+ table.batch(mutations);
+ } catch (InterruptedException e) {
+ // reset the interrupt status on the thread
+ Thread.currentThread().interrupt();
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ }
+ return Boolean.TRUE;
+ }
+
+ private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+ if (stopped.isStopped() || abortable.isAborted()
+ || Thread.currentThread().isInterrupted()) {
+ throw new SingleIndexWriteFailureException(
+ "Pool closed, not attempting to write to the index!", null);
+ }
+
+ }
+ });
+ }
+
+ List<Boolean> results = null;
+ try {
+ LOG.debug("Waiting on index update tasks to complete...");
+ results = this.pool.submitUninterruptible(tasks);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(
+ "Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+ } catch (EarlyExitFailure e) {
+ throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
+ }
+
+ // track the failures. We only ever access this on return from our calls, so no extra
+ // synchronization is needed. We could update all the failures as we find them, but that add a
+ // lot of locking overhead, and just doing the copy later is about as efficient.
+ List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
+ int index = 0;
+ for (Boolean result : results) {
+ // there was a failure
+ if (result == null) {
+ // we know which table failed by the index of the result
+ failures.add(tables.get(index));
+ }
+ index++;
+ }
+
+ // if any of the tasks failed, then we need to propagate the failure
+ if (failures.size() > 0) {
+ // make the list unmodifiable to avoid any more synchronization concerns
+ throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
+ }
+ return;
+ }
+
+ @Override
+ public void stop(String why) {
+ LOG.info("Shutting down " + this.getClass().getSimpleName());
+ this.pool.stop(why);
+ this.factory.shutdown();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped.isStopped();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/src/main/java/org/apache/phoenix/cache/GlobalCache.java
new file mode 100644
index 0000000..50ae9d9
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.memory.ChildMemoryManager;
+import org.apache.phoenix.memory.GlobalMemoryManager;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+
+
+/**
+ *
+ * Global root cache for the server. Each tenant is managed as a child tenant cache of this one. Queries
+ * not associated with a particular tenant use this as their tenant cache.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class GlobalCache extends TenantCacheImpl {
+ private static GlobalCache INSTANCE;
+
+ private final Configuration config;
+ // TODO: Use Guava cache with auto removal after lack of access
+ private final ConcurrentMap<ImmutableBytesWritable,TenantCache> perTenantCacheMap = new ConcurrentHashMap<ImmutableBytesWritable,TenantCache>();
+ // Cache for lastest PTable for a given Phoenix table
+ private final ConcurrentHashMap<ImmutableBytesPtr,PTable> metaDataCacheMap = new ConcurrentHashMap<ImmutableBytesPtr,PTable>();
+
+ public static synchronized GlobalCache getInstance(RegionCoprocessorEnvironment env) {
+ // See http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html
+ // for explanation of why double locking doesn't work.
+ if (INSTANCE == null) {
+ INSTANCE = new GlobalCache(env.getConfiguration());
+ }
+ return INSTANCE;
+ }
+
+ public ConcurrentHashMap<ImmutableBytesPtr,PTable> getMetaDataCache() {
+ return metaDataCacheMap;
+ }
+
+ /**
+ * Get the tenant cache associated with the tenantId. If tenantId is not applicable, null may be
+ * used in which case a global tenant cache is returned.
+ * @param env the HBase configuration
+ * @param tenantId the tenant ID or null if not applicable.
+ * @return TenantCache
+ */
+ public static TenantCache getTenantCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId) {
+ GlobalCache globalCache = GlobalCache.getInstance(env);
+ TenantCache tenantCache = tenantId == null ? globalCache : globalCache.getChildTenantCache(tenantId);
+ return tenantCache;
+ }
+
+ private GlobalCache(Configuration config) {
+ super(new GlobalMemoryManager(Runtime.getRuntime().totalMemory() *
+ config.getInt(MAX_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MEMORY_PERC) / 100,
+ config.getInt(MAX_MEMORY_WAIT_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MEMORY_WAIT_MS)),
+ config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS));
+ this.config = config;
+ }
+
+ public Configuration getConfig() {
+ return config;
+ }
+
+ /**
+ * Retrieve the tenant cache given an tenantId.
+ * @param tenantId the ID that identifies the tenant
+ * @return the existing or newly created TenantCache
+ */
+ public TenantCache getChildTenantCache(ImmutableBytesWritable tenantId) {
+ TenantCache tenantCache = perTenantCacheMap.get(tenantId);
+ if (tenantCache == null) {
+ int maxTenantMemoryPerc = config.getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC);
+ int maxServerCacheTimeToLive = config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+ TenantCacheImpl newTenantCache = new TenantCacheImpl(new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc), maxServerCacheTimeToLive);
+ tenantCache = perTenantCacheMap.putIfAbsent(tenantId, newTenantCache);
+ if (tenantCache == null) {
+ tenantCache = newTenantCache;
+ }
+ }
+ return tenantCache;
+ }
+}