You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:16:01 UTC
[45/51] [partial] Initial commit of master branch from github
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
new file mode 100644
index 0000000..46a771b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.index.write.recovery;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.hbase.index.CapturingAbortable;
+import org.apache.hadoop.hbase.index.exception.MultiIndexWriteFailureException;
+import org.apache.hadoop.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.hadoop.hbase.index.parallel.EarlyExitFailure;
+import org.apache.hadoop.hbase.index.parallel.Task;
+import org.apache.hadoop.hbase.index.parallel.TaskBatch;
+import org.apache.hadoop.hbase.index.parallel.TaskRunner;
+import org.apache.hadoop.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.hadoop.hbase.index.parallel.ThreadPoolManager;
+import org.apache.hadoop.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.hadoop.hbase.index.table.CachingHTableFactory;
+import org.apache.hadoop.hbase.index.table.HTableFactory;
+import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
+import org.apache.hadoop.hbase.index.write.IndexCommitter;
+import org.apache.hadoop.hbase.index.write.IndexWriter;
+import org.apache.hadoop.hbase.index.write.IndexWriterUtils;
+import org.apache.hadoop.hbase.index.write.ParallelWriterIndexCommitter;
+
+/**
+ * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to
+ * allow the caller to retrieve the failed and succeeded index updates. Therefore, this class will
+ * be a lot slower, in the face of failures, when compared to the
+ * {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
+ * you need to at least attempt all writes and know their result; for instance, this is fine for
+ * doing WAL recovery - it's not a performance intensive situation and we want to limit the the
+ * edits we need to retry.
+ * <p>
+ * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that
+ * contains the list of {@link HTableInterfaceReference} that didn't complete successfully.
+ * <p>
+ * Failures to write to the index can happen several different ways:
+ * <ol>
+ * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}.
+ * This causing any pending tasks to fail whatever they are doing as fast as possible. Any writes
+ * that have not begun are not even attempted and marked as failures.</li>
+ * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index
+ * table is not available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase
+ * exceptions.</li>
+ * </ol>
+ * Regardless of how the write fails, we still wait for all writes to complete before passing the
+ * failure back to the client.
+ */
+public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
+ private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
+
+ public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
+ private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+ private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
+ "index.trackingwriter.threads.keepalivetime";
+
+ private TaskRunner pool;
+ private HTableFactory factory;
+ private CapturingAbortable abortable;
+ private Stoppable stopped;
+
+ @Override
+ public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+ Configuration conf = env.getConfiguration();
+ setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+ ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder(name, conf).
+ setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
+ setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
+ env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
+ }
+
+ /**
+ * Setup <tt>this</tt>.
+ * <p>
+ * Exposed for TESTING
+ */
+ void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+ int cacheSize) {
+ this.pool = new WaitForCompletionTaskRunner(pool);
+ this.factory = new CachingHTableFactory(factory, cacheSize);
+ this.abortable = new CapturingAbortable(abortable);
+ this.stopped = stop;
+ }
+
+ @Override
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+ throws MultiIndexWriteFailureException {
+ Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+ TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
+ List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
+ for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+ // get the mutations for each table. We leak the implementation here a little bit to save
+ // doing a complete copy over of all the index update for each table.
+ final List<Mutation> mutations = (List<Mutation>) entry.getValue();
+ // track each reference so we can get at it easily later, when determing failures
+ final HTableInterfaceReference tableReference = entry.getKey();
+ tables.add(tableReference);
+
+ /*
+ * Write a batch of index updates to an index table. This operation stops (is cancelable) via
+ * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
+ * running thread. The former will only work if we are not in the midst of writing the current
+ * batch to the table, though we do check these status variables before starting and before
+ * writing the batch. The latter usage, interrupting the thread, will work in the previous
+ * situations as was at some points while writing the batch, depending on the underlying
+ * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
+ * supports an interrupt).
+ */
+ tasks.add(new Task<Boolean>() {
+
+ /**
+ * Do the actual write to the primary table. We don't need to worry about closing the table
+ * because that is handled the {@link CachingHTableFactory}.
+ */
+ @Override
+ public Boolean call() throws Exception {
+ try {
+ // this may have been queued, but there was an abort/stop so we try to early exit
+ throwFailureIfDone();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+ }
+ HTableInterface table = factory.getTable(tableReference.get());
+ throwFailureIfDone();
+ table.batch(mutations);
+ } catch (InterruptedException e) {
+ // reset the interrupt status on the thread
+ Thread.currentThread().interrupt();
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ }
+ return Boolean.TRUE;
+ }
+
+ private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+ if (stopped.isStopped() || abortable.isAborted()
+ || Thread.currentThread().isInterrupted()) {
+ throw new SingleIndexWriteFailureException(
+ "Pool closed, not attempting to write to the index!", null);
+ }
+
+ }
+ });
+ }
+
+ List<Boolean> results = null;
+ try {
+ LOG.debug("Waiting on index update tasks to complete...");
+ results = this.pool.submitUninterruptible(tasks);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(
+ "Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+ } catch (EarlyExitFailure e) {
+ throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
+ }
+
+ // track the failures. We only ever access this on return from our calls, so no extra
+ // synchronization is needed. We could update all the failures as we find them, but that add a
+ // lot of locking overhead, and just doing the copy later is about as efficient.
+ List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
+ int index = 0;
+ for (Boolean result : results) {
+ // there was a failure
+ if (result == null) {
+ // we know which table failed by the index of the result
+ failures.add(tables.get(index));
+ }
+ index++;
+ }
+
+ // if any of the tasks failed, then we need to propagate the failure
+ if (failures.size() > 0) {
+ // make the list unmodifiable to avoid any more synchronization concerns
+ throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
+ }
+ return;
+ }
+
+ @Override
+ public void stop(String why) {
+ LOG.info("Shutting down " + this.getClass().getSimpleName());
+ this.pool.stop(why);
+ this.factory.shutdown();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped.isStopped();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java
new file mode 100644
index 0000000..8314fef
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Like a {@link KeyValueSkipListSet}, but also exposes useful, atomic methods (e.g.
+ * {@link #putIfAbsent(KeyValue)}).
+ */
+public class IndexKeyValueSkipListSet extends KeyValueSkipListSet {
+
+ // this is annoying that we need to keep this extra pointer around here, but its pretty minimal
+ // and means we don't need to change the HBase code.
+ private ConcurrentSkipListMap<KeyValue, KeyValue> delegate;
+
+ /**
+ * Create a new {@link IndexKeyValueSkipListSet} based on the passed comparator.
+ * @param comparator to use when comparing keyvalues. It is used both to determine sort order as
+ * well as object equality in the map.
+ * @return a map that uses the passed comparator
+ */
+ public static IndexKeyValueSkipListSet create(Comparator<KeyValue> comparator) {
+ ConcurrentSkipListMap<KeyValue, KeyValue> delegate =
+ new ConcurrentSkipListMap<KeyValue, KeyValue>(comparator);
+ IndexKeyValueSkipListSet ret = new IndexKeyValueSkipListSet(delegate);
+ return ret;
+ }
+
+ /**
+ * @param delegate map to which to delegate all calls
+ */
+ public IndexKeyValueSkipListSet(ConcurrentSkipListMap<KeyValue, KeyValue> delegate) {
+ super(delegate);
+ this.delegate = delegate;
+ }
+
+ /**
+ * Add the passed {@link KeyValue} to the set, only if one is not already set. This is equivalent
+ * to
+ * <pre>
+ * if (!set.containsKey(key))
+ * return set.put(key);
+ * else
+ * return map.set(key);
+ * </pre>
+ * except that the action is performed atomically.
+ * @param kv {@link KeyValue} to add
+ * @return the previous value associated with the specified key, or <tt>null</tt> if there was no
+ * previously stored key
+ * @throws ClassCastException if the specified key cannot be compared with the keys currently in
+ * the map
+ * @throws NullPointerException if the specified key is null
+ */
+ public KeyValue putIfAbsent(KeyValue kv) {
+ return this.delegate.putIfAbsent(kv, kv);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
new file mode 100644
index 0000000..bad82c4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
@@ -0,0 +1,152 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
+import org.apache.hadoop.io.Writable;
+
+
+
+/**
+ * A WALReader that can also deserialize custom {@link WALEdit}s that contain index information.
+ * <p>
+ * This is basically a wrapper around a {@link SequenceFileLogReader} that has a custom
+ * {@link SequenceFileLogReader.WALReader#next(Object)} method that only replaces the creation of the WALEdit with our own custom
+ * type
+ * <p>
+ * This is a little bit of a painful way of going about this, but saves the effort of hacking the
+ * HBase source (and deal with getting it reviewed and backported, etc.) and still works.
+ */
+/*
+ * TODO: Support splitting index updates into their own WAL entries on recovery (basically, just
+ * queue them up in next), if we know that the region was on the server when it crashed. However,
+ * this is kind of difficult as we need to know a lot of things the state of the system - basically,
+ * we need to track which of the regions were on the server when it crashed only only split those
+ * edits out into their respective regions.
+ */
+public class IndexedHLogReader implements Reader {
+ private static final Log LOG = LogFactory.getLog(IndexedHLogReader.class);
+
+ private SequenceFileLogReader delegate;
+
+
+ private static class IndexedWALReader extends SequenceFileLogReader.WALReader {
+
+ /**
+ * @param fs
+ * @param p
+ * @param c
+ * @throws IOException
+ */
+ IndexedWALReader(FileSystem fs, Path p, Configuration c) throws IOException {
+ super(fs, p, c);
+ }
+
+ /**
+ * we basically have to reproduce what the SequenceFile.Reader is doing in next(), but without
+ * the check on the value class, since we have a special value class that doesn't directly match
+ * what was specified in the file header
+ */
+ @Override
+ public synchronized boolean next(Writable key, Writable val) throws IOException {
+ boolean more = next(key);
+
+ if (more) {
+ getCurrentValue(val);
+ }
+
+ return more;
+ }
+
+ }
+
+ public IndexedHLogReader() {
+ this.delegate = new SequenceFileLogReader();
+ }
+
+ @Override
+ public void init(final FileSystem fs, final Path path, Configuration conf) throws IOException {
+ this.delegate.init(fs, path, conf);
+ // close the old reader and replace with our own, custom one
+ this.delegate.reader.close();
+ this.delegate.reader = new IndexedWALReader(fs, path, conf);
+ Exception e = new Exception();
+ LOG.info("Instantiated indexed log reader." + Arrays.toString(e.getStackTrace()));
+ LOG.info("Got conf: " + conf);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.delegate.close();
+ }
+
+ @Override
+ public Entry next() throws IOException {
+ return next(null);
+ }
+
+ @Override
+ public Entry next(Entry reuse) throws IOException {
+ delegate.entryStart = delegate.reader.getPosition();
+ HLog.Entry e = reuse;
+ if (e == null) {
+ HLogKey key;
+ if (delegate.keyClass == null) {
+ key = HLog.newKey(delegate.conf);
+ } else {
+ try {
+ key = delegate.keyClass.newInstance();
+ } catch (InstantiationException ie) {
+ throw new IOException(ie);
+ } catch (IllegalAccessException iae) {
+ throw new IOException(iae);
+ }
+ }
+ WALEdit val = new WALEdit();
+ e = new HLog.Entry(key, val);
+ }
+
+ // now read in the HLog.Entry from the WAL
+ boolean nextPairValid = false;
+ try {
+ if (delegate.compressionContext != null) {
+ throw new UnsupportedOperationException(
+ "Reading compression isn't supported with the IndexedHLogReader! Compresed WALEdits "
+ + "are only support for HBase 0.94.9+ and with the IndexedWALEditCodec!");
+ }
+ // this is the special bit - we use our custom entry to read in the key-values that have index
+ // information, but otherwise it looks just like a regular WALEdit
+ IndexedWALEdit edit = new IndexedWALEdit(e.getEdit());
+ nextPairValid = delegate.reader.next(e.getKey(), edit);
+ } catch (IOException ioe) {
+ throw delegate.addFileInfoToException(ioe);
+ }
+ delegate.edit++;
+ if (delegate.compressionContext != null && delegate.emptyCompressionContext) {
+ delegate.emptyCompressionContext = false;
+ }
+ return nextPairValid ? e : null;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ this.delegate.seek(pos);
+ }
+
+ @Override
+ public long getPosition() throws IOException {
+ return this.delegate.getPosition();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ this.delegate.reset();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
new file mode 100644
index 0000000..6749cc9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
@@ -0,0 +1,91 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hadoop.hbase.index.wal.KeyValueCodec;
+
+/**
+ * Read in data for a delegate {@link WALEdit}. This should only be used in concert with an IndexedHLogReader
+ * <p>
+ * This class should only be used with HBase < 0.94.9. Newer installations of HBase should
+ * instead use the IndexedWALEditCodec along with the correct configuration options.
+ */
+public class IndexedWALEdit extends WALEdit {
+ //reproduced here so we don't need to modify the HBase source.
+ private static final int VERSION_2 = -1;
+ private WALEdit delegate;
+
+ /**
+ * Copy-constructor. Only does a surface copy of the delegates fields - no actual data is copied, only referenced.
+ * @param delegate to copy
+ */
+ @SuppressWarnings("deprecation")
+ public IndexedWALEdit(WALEdit delegate) {
+ this.delegate = delegate;
+ // reset the delegate's fields
+ this.delegate.getKeyValues().clear();
+ if (this.delegate.getScopes() != null) {
+ this.delegate.getScopes().clear();
+ }
+ }
+
+ public IndexedWALEdit() {
+
+ }
+
+ @Override
+public void setCompressionContext(CompressionContext context) {
+ throw new UnsupportedOperationException(
+ "Compression not supported for IndexedWALEdit! If you are using HBase 0.94.9+, use IndexedWALEditCodec instead.");
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ delegate.getKeyValues().clear();
+ if (delegate.getScopes() != null) {
+ delegate.getScopes().clear();
+ }
+ // ----------------------------------------------------------------------------------------
+ // no compression, so we do pretty much what the usual WALEdit does, plus a little magic to
+ // capture the index updates
+ // -----------------------------------------------------------------------------------------
+ int versionOrLength = in.readInt();
+ if (versionOrLength != VERSION_2) {
+ throw new IOException("You must update your cluster to the lastest version of HBase and"
+ + " clean out all logs (cleanly start and then shutdown) before enabling indexing!");
+ }
+ // this is new style HLog entry containing multiple KeyValues.
+ List<KeyValue> kvs = KeyValueCodec.readKeyValues(in);
+ delegate.getKeyValues().addAll(kvs);
+
+ // then read in the rest of the WALEdit
+ int numFamilies = in.readInt();
+ NavigableMap<byte[], Integer> scopes = delegate.getScopes();
+ if (numFamilies > 0) {
+ if (scopes == null) {
+ scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ }
+ for (int i = 0; i < numFamilies; i++) {
+ byte[] fam = Bytes.readByteArray(in);
+ int scope = in.readInt();
+ scopes.put(fam, scope);
+ }
+ delegate.setScopes(scopes);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new IOException(
+ "Indexed WALEdits aren't written directly out - use IndexedKeyValues instead");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
new file mode 100644
index 0000000..01d7390
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
@@ -0,0 +1,195 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.BaseDecoder;
+import org.apache.hadoop.hbase.codec.BaseEncoder;
+import org.apache.hadoop.hbase.codec.Decoder;
+import org.apache.hadoop.hbase.codec.Encoder;
+
+import org.apache.hadoop.hbase.index.wal.IndexedKeyValue;
+import org.apache.hadoop.hbase.index.wal.KeyValueCodec;
+
+
+/**
+ * Support custom indexing {@link KeyValue}s when written to the WAL.
+ * <p>
+ * Currently, we don't support reading older WAL files - only new WAL files. Therefore, this should
+ * not be installed on a running cluster, but rather one that has been cleanly shutdown and requires
+ * no WAL replay on startup.
+ */
+public class IndexedWALEditCodec extends WALEditCodec {
+
+ // can't have negative values because reading off a stream returns a negative if its the end of
+ // the stream
+ private static final int REGULAR_KEY_VALUE_MARKER = 0;
+ private CompressionContext compression;
+
+ /** Required nullary constructor */
+ public IndexedWALEditCodec() {
+ }
+
+ /**
+ * Override the parent implementation so we can get access to the current context too
+ * @param compression compression to support for the encoder/decoder
+ */
+ @Override
+ public void setCompression(CompressionContext compression) {
+ super.setCompression(compression);
+ this.compression = compression;
+ }
+
+ @Override
+ public Decoder getDecoder(InputStream is) {
+ // compression isn't enabled
+ if (this.compression == null) {
+ return new IndexKeyValueDecoder(is);
+ }
+
+ // there is compression, so we get the standard decoder to handle reading those kvs
+ Decoder decoder = super.getDecoder(is);
+ // compression is on, reqturn our custom decoder
+ return new CompressedIndexKeyValueDecoder(is, decoder);
+ }
+
+ @Override
+ public Encoder getEncoder(OutputStream os) {
+ // compression isn't on, do the default thing
+ if (this.compression == null) {
+ return new IndexKeyValueEncoder(os);
+ }
+
+ // compression is on, return our one that will handle putting in the correct markers
+ Encoder encoder = super.getEncoder(os);
+ return new CompressedIndexKeyValueEncoder(os, encoder);
+ }
+
+ /**
+ * Custom {@link Decoder} that can handle a stream of regular and indexed {@link KeyValue}s.
+ */
+ public class IndexKeyValueDecoder extends BaseDecoder {
+
+ /**
+ * Create a {@link Decoder} on the given input stream with the given {@link Decoder} to parse
+ * generic {@link KeyValue}s.
+ * @param is stream to read from
+ */
+ public IndexKeyValueDecoder(InputStream is){
+ super(is);
+ }
+
+ @Override
+ protected KeyValue parseCell() throws IOException{
+ return KeyValueCodec.readKeyValue((DataInput) this.in);
+ }
+ }
+
+ public class CompressedIndexKeyValueDecoder extends BaseDecoder {
+
+ private Decoder decoder;
+
+ /**
+ * Create a {@link Decoder} on the given input stream with the given {@link Decoder} to parse
+ * generic {@link KeyValue}s.
+ * @param is stream to read from
+ * @param compressedDecoder decoder for generic {@link KeyValue}s. Should support the expected
+ * compression.
+ */
+ public CompressedIndexKeyValueDecoder(InputStream is, Decoder compressedDecoder) {
+ super(is);
+ this.decoder = compressedDecoder;
+ }
+
+ @Override
+ protected KeyValue parseCell() throws IOException {
+ // reader the marker
+ int marker = this.in.read();
+ if (marker < 0) {
+ throw new EOFException(
+ "Unexepcted end of stream found while reading next (Indexed) KeyValue");
+ }
+
+ // do the normal thing, if its a regular kv
+ if (marker == REGULAR_KEY_VALUE_MARKER) {
+ if (!this.decoder.advance()) {
+ throw new IOException("Could not read next key-value from generic KeyValue Decoder!");
+ }
+ return this.decoder.current();
+ }
+
+ // its an indexedKeyValue, so parse it out specially
+ return KeyValueCodec.readKeyValue((DataInput) this.in);
+ }
+ }
+
+ /**
+ * Encode {@link IndexedKeyValue}s via the {@link KeyValueCodec}. Does <b>not</b> support
+ * compression.
+ */
+ private static class IndexKeyValueEncoder extends BaseEncoder {
+ public IndexKeyValueEncoder(OutputStream os) {
+ super(os);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ }
+
+ @Override
+ public void write(KeyValue cell) throws IOException {
+ // make sure we are open
+ checkFlushed();
+
+ // use the standard encoding mechanism
+ KeyValueCodec.write((DataOutput) this.out, cell);
+ }
+ }
+
+ /**
+ * Write {@link IndexedKeyValue}s along side compressed {@link KeyValue}s. This Encoder is
+ * <b>not</b> compatible with the {@link IndexKeyValueDecoder} - one cannot intermingle compressed
+ * and uncompressed WALs that contain index entries.
+ */
+ private static class CompressedIndexKeyValueEncoder extends BaseEncoder {
+ private Encoder compressedKvEncoder;
+
+ public CompressedIndexKeyValueEncoder(OutputStream os, Encoder compressedKvEncoder) {
+ super(os);
+ this.compressedKvEncoder = compressedKvEncoder;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ this.compressedKvEncoder.flush();
+ super.flush();
+ }
+
+ @Override
+ public void write(KeyValue cell) throws IOException {
+ //make sure we are open
+ checkFlushed();
+
+ //write the special marker so we can figure out which kind of kv is it
+ int marker = IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER;
+ if (cell instanceof IndexedKeyValue) {
+ marker = KeyValueCodec.INDEX_TYPE_LENGTH_MARKER;
+ }
+ out.write(marker);
+
+ //then serialize based on the marker
+ if (marker == IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER) {
+ this.compressedKvEncoder.write(cell);
+ }
+ else{
+ KeyValueCodec.write((DataOutput) out, cell);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
new file mode 100644
index 0000000..77684c5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.memory.ChildMemoryManager;
+import org.apache.phoenix.memory.GlobalMemoryManager;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+
+
+/**
+ *
+ * Global root cache for the server. Each tenant is managed as a child tenant cache of this one. Queries
+ * not associated with a particular tenant use this as their tenant cache.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class GlobalCache extends TenantCacheImpl {
+ private static GlobalCache INSTANCE;
+
+ private final Configuration config;
+ // TODO: Use Guava cache with auto removal after lack of access
+ private final ConcurrentMap<ImmutableBytesWritable,TenantCache> perTenantCacheMap = new ConcurrentHashMap<ImmutableBytesWritable,TenantCache>();
+ // Cache for lastest PTable for a given Phoenix table
+ private final ConcurrentHashMap<ImmutableBytesPtr,PTable> metaDataCacheMap = new ConcurrentHashMap<ImmutableBytesPtr,PTable>();
+
+ public static synchronized GlobalCache getInstance(RegionCoprocessorEnvironment env) {
+ // See http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html
+ // for explanation of why double locking doesn't work.
+ if (INSTANCE == null) {
+ INSTANCE = new GlobalCache(env.getConfiguration());
+ }
+ return INSTANCE;
+ }
+
+ public ConcurrentHashMap<ImmutableBytesPtr,PTable> getMetaDataCache() {
+ return metaDataCacheMap;
+ }
+
+ /**
+ * Get the tenant cache associated with the tenantId. If tenantId is not applicable, null may be
+ * used in which case a global tenant cache is returned.
+ * @param env the HBase configuration
+ * @param tenantId the tenant ID or null if not applicable.
+ * @return TenantCache
+ */
+ public static TenantCache getTenantCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId) {
+ GlobalCache globalCache = GlobalCache.getInstance(env);
+ TenantCache tenantCache = tenantId == null ? globalCache : globalCache.getChildTenantCache(tenantId);
+ return tenantCache;
+ }
+
+ private GlobalCache(Configuration config) {
+ super(new GlobalMemoryManager(Runtime.getRuntime().totalMemory() *
+ config.getInt(MAX_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MEMORY_PERC) / 100,
+ config.getInt(MAX_MEMORY_WAIT_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MEMORY_WAIT_MS)),
+ config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS));
+ this.config = config;
+ }
+
+ public Configuration getConfig() {
+ return config;
+ }
+
+ /**
+ * Retrieve the tenant cache given an tenantId.
+ * @param tenantId the ID that identifies the tenant
+ * @return the existing or newly created TenantCache
+ */
+ public TenantCache getChildTenantCache(ImmutableBytesWritable tenantId) {
+ TenantCache tenantCache = perTenantCacheMap.get(tenantId);
+ if (tenantCache == null) {
+ int maxTenantMemoryPerc = config.getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC);
+ int maxServerCacheTimeToLive = config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+ TenantCacheImpl newTenantCache = new TenantCacheImpl(new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc), maxServerCacheTimeToLive);
+ tenantCache = perTenantCacheMap.putIfAbsent(tenantId, newTenantCache);
+ if (tenantCache == null) {
+ tenantCache = newTenantCache;
+ }
+ }
+ return tenantCache;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
new file mode 100644
index 0000000..4260a50
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.http.annotation.Immutable;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * Encapsulate deserialized hash cache from bytes into Map.
+ * The Map uses the row key as the key and the row as the value.
+ * @author jtaylor
+ * @since 0.1
+ */
+@Immutable
+public interface HashCache extends Closeable {
+ public List<Tuple> get(ImmutableBytesPtr hashKey);
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
new file mode 100644
index 0000000..fac78de
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -0,0 +1,10 @@
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.phoenix.index.IndexMaintainer;
+
+public interface IndexMetaDataCache extends Closeable {
+ public List<IndexMaintainer> getIndexMaintainers();
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
new file mode 100644
index 0000000..8703c7c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -0,0 +1,278 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
+
+/**
+ *
+ * Client for sending cache to each region server
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerCacheClient {
+ public static final int UUID_LENGTH = Bytes.SIZEOF_LONG;
+ private static final Log LOG = LogFactory.getLog(ServerCacheClient.class);
+ private static final Random RANDOM = new Random();
+ private final PhoenixConnection connection;
+ private final Map<Integer, TableRef> cacheUsingTableRefMap = new HashMap<Integer, TableRef>();
+
+ /**
+ * Construct client used to create a serialized cached snapshot of a table and send it to each region server
+ * for caching during hash join processing.
+ * @param connection the client connection
+ *
+ * TODO: instead of minMaxKeyRange, have an interface for iterating through ranges as we may be sending to
+ * servers when we don't have to if the min is in first region and max is in last region, especially for point queries.
+ */
+ public ServerCacheClient(PhoenixConnection connection) {
+ this.connection = connection;
+ }
+
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ /**
+ * Client-side representation of a server cache. Call {@link #close()} when usage
+ * is complete to free cache up on region server
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+ public class ServerCache implements SQLCloseable {
+ private final int size;
+ private final byte[] id;
+ private final ImmutableSet<HRegionLocation> servers;
+
+ public ServerCache(byte[] id, Set<HRegionLocation> servers, int size) {
+ this.id = id;
+ this.servers = ImmutableSet.copyOf(servers);
+ this.size = size;
+ }
+
+ /**
+ * Gets the size in bytes of hash cache
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * Gets the unique identifier for this hash cache
+ */
+ public byte[] getId() {
+ return id;
+ }
+
+ /**
+ * Call to free up cache on region servers when no longer needed
+ */
+ @Override
+ public void close() throws SQLException {
+ removeServerCache(id, servers);
+ }
+
+ }
+
+ public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
+ ConnectionQueryServices services = connection.getQueryServices();
+ MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
+ List<Closeable> closeables = new ArrayList<Closeable>();
+ closeables.add(chunk);
+ ServerCache hashCacheSpec = null;
+ SQLException firstException = null;
+ final byte[] cacheId = generateId();
+ /**
+ * Execute EndPoint in parallel on each server to send compressed hash cache
+ */
+ // TODO: generalize and package as a per region server EndPoint caller
+ // (ideally this would be functionality provided by the coprocessor framework)
+ boolean success = false;
+ ExecutorService executor = services.getExecutor();
+ List<Future<Boolean>> futures = Collections.emptyList();
+ try {
+ List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
+ int nRegions = locations.size();
+ // Size these based on worst case
+ futures = new ArrayList<Future<Boolean>>(nRegions);
+ Set<HRegionLocation> servers = new HashSet<HRegionLocation>(nRegions);
+ for (HRegionLocation entry : locations) {
+ // Keep track of servers we've sent to and only send once
+ if ( ! servers.contains(entry) &&
+ keyRanges.intersect(entry.getRegionInfo().getStartKey(), entry.getRegionInfo().getEndKey())) { // Call RPC once per server
+ servers.add(entry);
+ if (LOG.isDebugEnabled()) {LOG.debug("Adding cache entry to be sent for " + entry);}
+ final byte[] key = entry.getRegionInfo().getStartKey();
+ final HTableInterface htable = services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
+ closeables.add(htable);
+ futures.add(executor.submit(new JobCallable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ ServerCachingProtocol protocol = htable.coprocessorProxy(ServerCachingProtocol.class, key);
+ return protocol.addServerCache(connection.getTenantId() == null ? null : connection.getTenantId().getBytes(), cacheId, cachePtr, cacheFactory);
+ }
+
+ /**
+ * Defines the grouping for round robin behavior. All threads spawned to process
+ * this scan will be grouped together and time sliced with other simultaneously
+ * executing parallel scans.
+ */
+ @Override
+ public Object getJobId() {
+ return ServerCacheClient.this;
+ }
+ }));
+ } else {
+ if (LOG.isDebugEnabled()) {LOG.debug("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry");}
+ }
+ }
+
+ hashCacheSpec = new ServerCache(cacheId,servers,cachePtr.getLength());
+ // Execute in parallel
+ int timeoutMs = services.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ for (Future<Boolean> future : futures) {
+ future.get(timeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ cacheUsingTableRefMap.put(Bytes.mapKey(cacheId), cacheUsingTableRef);
+ success = true;
+ } catch (SQLException e) {
+ firstException = e;
+ } catch (Exception e) {
+ firstException = new SQLException(e);
+ } finally {
+ try {
+ if (!success) {
+ SQLCloseables.closeAllQuietly(Collections.singletonList(hashCacheSpec));
+ for (Future<Boolean> future : futures) {
+ future.cancel(true);
+ }
+ }
+ } finally {
+ try {
+ Closeables.closeAll(closeables);
+ } catch (IOException e) {
+ if (firstException == null) {
+ firstException = new SQLException(e);
+ }
+ } finally {
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+ }
+ }
+ return hashCacheSpec;
+ }
+
+ /**
+ * Remove the cached table from all region servers
+ * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
+ * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
+ * @throws SQLException
+ * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
+ */
+ private void removeServerCache(byte[] cacheId, Set<HRegionLocation> servers) throws SQLException {
+ ConnectionQueryServices services = connection.getQueryServices();
+ Throwable lastThrowable = null;
+ TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
+ byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
+ HTableInterface iterateOverTable = services.getTable(tableName);
+ List<HRegionLocation> locations = services.getAllTableRegions(tableName);
+ Set<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers);
+ /**
+ * Allow for the possibility that the region we based where to send our cache has split and been
+ * relocated to another region server *after* we sent it, but before we removed it. To accommodate
+ * this, we iterate through the current metadata boundaries and remove the cache once for each
+ * server that we originally sent to.
+ */
+ for (HRegionLocation entry : locations) {
+ if (remainingOnServers.contains(entry)) { // Call once per server
+ try {
+ byte[] key = entry.getRegionInfo().getStartKey();
+ ServerCachingProtocol protocol = iterateOverTable.coprocessorProxy(ServerCachingProtocol.class, key);
+ protocol.removeServerCache(connection.getTenantId() == null ? null : connection.getTenantId().getBytes(), cacheId);
+ remainingOnServers.remove(entry);
+ } catch (Throwable t) {
+ lastThrowable = t;
+ LOG.error("Error trying to remove hash cache for " + entry, t);
+ }
+ }
+ }
+ if (!remainingOnServers.isEmpty()) {
+ LOG.warn("Unable to remove hash cache for " + remainingOnServers, lastThrowable);
+ }
+ }
+
+ /**
+ * Create an ID to keep the cached information across other operations independent.
+ * Using simple long random number, since the length of time we need this to be unique
+ * is very limited.
+ */
+ public static byte[] generateId() {
+ long rand = RANDOM.nextLong();
+ return Bytes.toBytes(rand);
+ }
+
+ public static String idToString(byte[] uuid) {
+ assert(uuid.length == Bytes.SIZEOF_LONG);
+ return Long.toString(Bytes.toLong(uuid));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
new file mode 100644
index 0000000..ad40d8c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.memory.MemoryManager;
+
+
+/**
+ *
+ * Inteface to set and set cached values for a tenant
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface TenantCache {
+ MemoryManager getMemoryManager();
+ Closeable getServerCache(ImmutableBytesPtr cacheId);
+ Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+ void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
new file mode 100644
index 0000000..eabeb11
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.cache.*;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.Closeables;
+
+/**
+ *
+ * Cache per tenant on server side. Tracks memory usage for each
+ * tenat as well and rolling up usage to global memory manager.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class TenantCacheImpl implements TenantCache {
+ private final int maxTimeToLiveMs;
+ private final MemoryManager memoryManager;
+ private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches;
+
+ public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) {
+ this.memoryManager = memoryManager;
+ this.maxTimeToLiveMs = maxTimeToLiveMs;
+ }
+
+ @Override
+ public MemoryManager getMemoryManager() {
+ return memoryManager;
+ }
+
+ private Cache<ImmutableBytesPtr,Closeable> getServerCaches() {
+ /* Delay creation of this map until it's needed */
+ if (serverCaches == null) {
+ synchronized(this) {
+ if (serverCaches == null) {
+ serverCaches = CacheBuilder.newBuilder()
+ .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS)
+ .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){
+ @Override
+ public void onRemoval(RemovalNotification<ImmutableBytesPtr, Closeable> notification) {
+ Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
+ }
+ })
+ .build();
+ }
+ }
+ }
+ return serverCaches;
+ }
+
+ @Override
+ public Closeable getServerCache(ImmutableBytesPtr cacheId) {
+ return getServerCaches().getIfPresent(cacheId);
+ }
+
+ @Override
+ public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
+ MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength());
+ Closeable element = cacheFactory.newCache(cachePtr, chunk);
+ getServerCaches().put(cacheId, element);
+ return element;
+ }
+
+ @Override
+ public void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException {
+ getServerCaches().invalidate(cacheId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
new file mode 100644
index 0000000..f2ad908
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
@@ -0,0 +1,142 @@
+package org.apache.phoenix.cache.aggcache;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+
+/**
+ * This class abstracts a SpillFile It is a accessible on a per page basis
+ * For every SpillFile object a single spill file is always created.
+ * Additional overflow files are dynamically created in case the page index requested is not covered by
+ * the spillFiles allocated so far
+ */
+public class SpillFile implements Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(SpillFile.class);
+ // Default size for a single spillFile 2GB
+ private static final int SPILL_FILE_SIZE = Integer.MAX_VALUE;
+ // Page size for a spill file 4K
+ static final int DEFAULT_PAGE_SIZE = 4096;
+ // Map of initial SpillFile at index 0, and overflow spillFiles
+ private Map<Integer, TempFile> tempFiles;
+
+ // Wrapper class for a TempFile: File + RandomAccessFile
+ private static class TempFile implements Closeable{
+ private RandomAccessFile rndFile;
+ private File file;
+
+ public TempFile(File file, RandomAccessFile rndFile) {
+ this.file = file;
+ this.rndFile = rndFile;
+ }
+
+ public FileChannel getChannel() {
+ return rndFile.getChannel();
+ }
+
+ @Override
+ public void close() throws IOException {
+ Closeables.closeQuietly(rndFile.getChannel());
+ Closeables.closeQuietly(rndFile);
+
+ if (file != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Deleting tempFile: " + file.getAbsolutePath());
+ }
+ try {
+ file.delete();
+ } catch (SecurityException e) {
+ logger.warn("IOException thrown while closing Closeable." + e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a new SpillFile using the Java TempFile creation function. SpillFile is access in
+ * pages.
+ */
+ public static SpillFile createSpillFile() {
+ try {
+ return new SpillFile(createTempFile());
+ } catch (IOException ioe) {
+ throw new RuntimeException("Could not create Spillfile " + ioe);
+ }
+ }
+
+
+ private static TempFile createTempFile() throws IOException {
+ File tempFile = File.createTempFile(UUID.randomUUID().toString(), null);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Creating new SpillFile: " + tempFile.getAbsolutePath());
+ }
+ RandomAccessFile file = new RandomAccessFile(tempFile, "rw");
+ file.setLength(SPILL_FILE_SIZE);
+
+ return new TempFile(tempFile, file);
+ }
+
+
+ private SpillFile(TempFile spFile) throws IOException {
+ this.tempFiles = Maps.newHashMap();
+ // Init the first pre-allocated spillFile
+ tempFiles.put(0, spFile);
+ }
+
+ /**
+ * Random access to a page of the current spill file
+ * @param index
+ */
+ public MappedByteBuffer getPage(int index) {
+ try {
+ TempFile tempFile = null;
+ int fileIndex = 0;
+
+ long offset = (long) index * (long) DEFAULT_PAGE_SIZE;
+ if(offset >= SPILL_FILE_SIZE) {
+ // Offset exceeds the first SpillFile size
+ // Get the index of the file that should contain the pageID
+ fileIndex = (int)(offset / SPILL_FILE_SIZE);
+ if(!tempFiles.containsKey(fileIndex)) {
+ // Dynamically add new spillFiles if directory grows beyond
+ // max page ID.
+ tempFile = createTempFile();
+ tempFiles.put(fileIndex, tempFile);
+ }
+ }
+ tempFile = tempFiles.get(fileIndex);
+ // Channel gets buffered in file object
+ FileChannel fc = tempFile.getChannel();
+
+ return fc.map(MapMode.READ_WRITE, offset, DEFAULT_PAGE_SIZE);
+ } catch (IOException ioe) {
+ // Close resource
+ close();
+ throw new RuntimeException("Could not get page at index: " + index);
+ } catch (IllegalArgumentException iae) {
+ // Close resource
+ close();
+ throw iae;
+ }
+ }
+
+ @Override
+ public void close() {
+ for(TempFile file : tempFiles.values()) {
+ // Swallow IOExceptions
+ Closeables.closeQuietly(file);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
new file mode 100644
index 0000000..448ffa5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
@@ -0,0 +1,323 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache.aggcache;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * Class servers as an adapter between the in-memory LRU cache and the Spill data structures. It
+ * takes care of serializing / deserializing the key/value groupby tuples, and spilling them to the
+ * correct spill partition
+ */
+public class SpillManager implements Closeable {
+
+ // Wrapper class for DESERIALIZED groupby key/value tuples
+ public static class CacheEntry<T extends ImmutableBytesWritable> implements
+ Map.Entry<T, Aggregator[]> {
+
+ protected T key;
+ protected Aggregator[] aggs;
+
+ public CacheEntry(T key, Aggregator[] aggs) {
+ this.key = key;
+ this.aggs = aggs;
+ }
+
+ public Aggregator[] getValue(Configuration conf) {
+ return aggs;
+ }
+
+ public int getKeyLength() {
+ return key.getLength();
+ }
+
+ @Override
+ public Aggregator[] getValue() {
+ return aggs;
+ }
+
+ @Override
+ public Aggregator[] setValue(Aggregator[] arg0) {
+ this.aggs = arg0;
+ return aggs;
+ }
+
+ @Override
+ public T getKey() {
+ return key;
+ }
+
+ }
+
+ private final ArrayList<SpillMap> spillMaps;
+ private final int numSpillFiles;
+
+ private final ServerAggregators aggregators;
+ private final Configuration conf;
+
+ /**
+ * SpillManager takes care of spilling and loading tuples from spilled data structs
+ * @param numSpillFiles
+ * @param serverAggregators
+ */
+ public SpillManager(int numSpillFiles, ServerAggregators serverAggregators,
+ Configuration conf, SpillableGroupByCache.QueryCache cache) {
+ try {
+ int estValueSize = serverAggregators.getEstimatedByteSize();
+ spillMaps = Lists.newArrayList();
+ this.numSpillFiles = numSpillFiles;
+ this.aggregators = serverAggregators;
+ this.conf = conf;
+
+ // Ensure that a single element fits onto a page!!!
+ Preconditions.checkArgument(SpillFile.DEFAULT_PAGE_SIZE > estValueSize);
+
+ // Create a list of spillFiles
+ // Each Spillfile only handles up to 2GB data
+ for (int i = 0; i < numSpillFiles; i++) {
+ SpillFile file = SpillFile.createSpillFile();
+ spillMaps.add(new SpillMap(file, SpillFile.DEFAULT_PAGE_SIZE, estValueSize, cache));
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException("Could not init the SpillManager");
+ }
+ }
+
+ // serialize a key/value tuple into a byte array
+ // WARNING: expensive
+ private byte[] serialize(ImmutableBytesPtr key, Aggregator[] aggs,
+ ServerAggregators serverAggs) throws IOException {
+
+ DataOutputStream output = null;
+ ByteArrayOutputStream bai = null;
+ try {
+ bai = new ByteArrayOutputStream();
+ output = new DataOutputStream(bai);
+ // key length
+ WritableUtils.writeVInt(output, key.getLength());
+ // key
+ output.write(key.get(), key.getOffset(), key.getLength());
+ byte[] aggsByte = serverAggs.toBytes(aggs);
+ // aggs length
+ WritableUtils.writeVInt(output, aggsByte.length);
+ // aggs
+ output.write(aggsByte);
+ return bai.toByteArray();
+ } finally {
+
+ if (bai != null) {
+ bai.close();
+ bai = null;
+ }
+ if (output != null) {
+ output.close();
+ output = null;
+ }
+ }
+ }
+
+ /**
+ * Helper method to deserialize the key part from a serialized byte array
+ * @param data
+ * @return
+ * @throws IOException
+ */
+ static ImmutableBytesPtr getKey(byte[] data) throws IOException {
+ DataInputStream input = null;
+ try {
+ input = new DataInputStream(new ByteArrayInputStream(data));
+ // key length
+ int keyLength = WritableUtils.readVInt(input);
+ int offset = WritableUtils.getVIntSize(keyLength);
+ // key
+ return new ImmutableBytesPtr(data, offset, keyLength);
+ } finally {
+ if (input != null) {
+ input.close();
+ input = null;
+ }
+ }
+ }
+
+
+ // Instantiate Aggregators form a serialized byte array
+ private Aggregator[] getAggregators(byte[] data) throws IOException {
+ DataInputStream input = null;
+ try {
+ input = new DataInputStream(new ByteArrayInputStream(data));
+ // key length
+ int keyLength = WritableUtils.readVInt(input);
+ int vIntKeyLength = WritableUtils.getVIntSize(keyLength);
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr(data, vIntKeyLength, keyLength);
+
+ // value length
+ input.skip(keyLength);
+ int valueLength = WritableUtils.readVInt(input);
+ int vIntValLength = WritableUtils.getVIntSize(keyLength);
+ KeyValue keyValue =
+ KeyValueUtil.newKeyValue(ptr.get(), ptr.getOffset(), ptr.getLength(),
+ QueryConstants.SINGLE_COLUMN_FAMILY, QueryConstants.SINGLE_COLUMN,
+ QueryConstants.AGG_TIMESTAMP, data, vIntKeyLength + keyLength + vIntValLength, valueLength);
+ Tuple result = new SingleKeyValueTuple(keyValue);
+ TupleUtil.getAggregateValue(result, ptr);
+ KeyValueSchema schema = aggregators.getValueSchema();
+ ValueBitSet tempValueSet = ValueBitSet.newInstance(schema);
+ tempValueSet.clear();
+ tempValueSet.or(ptr);
+
+ int i = 0, maxOffset = ptr.getOffset() + ptr.getLength();
+ SingleAggregateFunction[] funcArray = aggregators.getFunctions();
+ Aggregator[] sAggs = new Aggregator[funcArray.length];
+ Boolean hasValue;
+ schema.iterator(ptr);
+ while ((hasValue = schema.next(ptr, i, maxOffset, tempValueSet)) != null) {
+ SingleAggregateFunction func = funcArray[i];
+ sAggs[i++] =
+ hasValue ? func.newServerAggregator(conf, ptr) : func
+ .newServerAggregator(conf);
+ }
+ return sAggs;
+
+ } finally {
+ Closeables.closeQuietly(input);
+ }
+ }
+
+ /**
+ * Helper function to deserialize a byte array into a CacheEntry
+ * @param <K>
+ * @param bytes
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public <K extends ImmutableBytesWritable> CacheEntry<K> toCacheEntry(byte[] bytes)
+ throws IOException {
+ ImmutableBytesPtr key = SpillManager.getKey(bytes);
+ Aggregator[] aggs = getAggregators(bytes);
+
+ return new CacheEntry<K>((K) key, aggs);
+ }
+
+ // Determines the partition, i.e. spillFile the tuple should get spilled to.
+ private int getPartition(ImmutableBytesWritable key) {
+ // Simple implementation hash mod numFiles
+ return Math.abs(key.hashCode()) % numSpillFiles;
+ }
+
+ /**
+ * Function that spills a key/value groupby tuple into a partition Spilling always triggers a
+ * serialize call
+ * @param key
+ * @param value
+ * @throws IOException
+ */
+ public void spill(ImmutableBytesWritable key, Aggregator[] value) throws IOException {
+ SpillMap spillMap = spillMaps.get(getPartition(key));
+ ImmutableBytesPtr keyPtr = new ImmutableBytesPtr(key);
+ byte[] data = serialize(keyPtr, value, aggregators);
+ spillMap.put(keyPtr, data);
+ }
+
+ /**
+ * Function that loads a spilled key/value groupby tuple from one of the spill partitions into
+ * the LRU cache. Loading always involves deserialization
+ * @throws IOException
+ */
+ public Aggregator[] loadEntry(ImmutableBytesWritable key) throws IOException {
+ SpillMap spillMap = spillMaps.get(getPartition(key));
+ byte[] data = spillMap.get(key);
+ if (data != null) {
+ return getAggregators(data);
+ }
+ return null;
+ }
+
+ /**
+ * Close the attached spillMap
+ */
+ @Override
+ public void close() {
+ for (int i = 0; i < spillMaps.size(); i++) {
+ Closeables.closeQuietly(spillMaps.get(i).getSpillFile());
+ }
+ }
+
+ /**
+ * Function returns an iterator over all spilled Tuples
+ */
+ public SpillMapIterator newDataIterator() {
+ return new SpillMapIterator();
+ }
+
+ private final class SpillMapIterator implements Iterator<byte[]> {
+
+ int index = 0;
+ Iterator<byte[]> spillIter = spillMaps.get(index).iterator();
+
+ @Override
+ public boolean hasNext() {
+ if (!spillIter.hasNext()) {
+ if (index < (numSpillFiles - 1)) {
+ // Current spillFile exhausted get iterator over new one
+ spillIter = spillMaps.get(++index).iterator();
+ }
+ }
+ return spillIter.hasNext();
+ }
+
+ @Override
+ public byte[] next() {
+ return spillIter.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new IllegalAccessError("Remove is not supported for this type of iterator");
+ }
+ }
+}