You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2010/01/09 22:10:05 UTC
svn commit: r897547 [10/10] - in
/hadoop/hbase/branches/0.20_on_hadoop-0.18.3: ./ bin/ conf/ lib/
src/contrib/ src/contrib/ec2/ src/contrib/ec2/bin/
src/contrib/ec2/bin/image/ src/contrib/indexed/ src/contrib/indexed/lib/
src/contrib/indexed/lib/fmpp-0...
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sat Jan 9 21:09:59 2010
@@ -28,9 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
-import java.util.Set;
import java.util.SortedSet;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -80,10 +78,6 @@
// Used to track own heapSize
final AtomicLong size;
- // All access must be synchronized.
- final CopyOnWriteArraySet<ChangedMemStoreObserver> changedMemStoreObservers =
- new CopyOnWriteArraySet<ChangedMemStoreObserver>();
-
/**
* Default constructor. Used for tests.
*/
@@ -131,7 +125,6 @@
if (!this.kvset.isEmpty()) {
this.snapshot = this.kvset;
this.kvset = new KeyValueSkipListSet(this.comparator);
- tellChangedMemStoreObservers();
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
}
@@ -141,15 +134,6 @@
}
}
- /*
- * Tell outstanding scanners that memstore has changed.
- */
- private void tellChangedMemStoreObservers() {
- for (ChangedMemStoreObserver o: this.changedMemStoreObservers) {
- o.changedMemStore();
- }
- }
-
/**
* Return the current snapshot.
* Called by flusher to get current snapshot made by a previous
@@ -168,7 +152,7 @@
* @throws UnexpectedException
* @see {@link #snapshot()}
*/
- void clearSnapshot(final KeyValueSkipListSet ss)
+ void clearSnapshot(final SortedSet<KeyValue> ss)
throws UnexpectedException {
this.lock.writeLock().lock();
try {
@@ -180,7 +164,6 @@
// create a new snapshot and let the old one go.
if (!ss.isEmpty()) {
this.snapshot = new KeyValueSkipListSet(this.comparator);
- tellChangedMemStoreObservers();
}
} finally {
this.lock.writeLock().unlock();
@@ -204,7 +187,7 @@
return s;
}
- /**
+ /**
* Write a delete
* @param delete
* @return approximate size of the passed key and value.
@@ -221,7 +204,7 @@
//TODO Would be nice with if we had an iterator for this, so we could remove
//things that needs to be removed while iterating and don't have to go
//back and do it afterwards
-
+
try {
boolean notpresent = false;
List<KeyValue> deletes = new ArrayList<KeyValue>();
@@ -230,34 +213,34 @@
//Parse the delete, so that it is only done once
byte [] deleteBuffer = delete.getBuffer();
int deleteOffset = delete.getOffset();
-
+
int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset);
deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
-
+
short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset);
deleteOffset += Bytes.SIZEOF_SHORT;
int deleteRowOffset = deleteOffset;
-
+
deleteOffset += deleteRowLen;
-
+
byte deleteFamLen = deleteBuffer[deleteOffset];
deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen;
-
+
int deleteQualifierOffset = deleteOffset;
int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen -
- Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG -
+ Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG -
Bytes.SIZEOF_BYTE;
-
+
deleteOffset += deleteQualifierLen;
-
+
int deleteTimestampOffset = deleteOffset;
deleteOffset += Bytes.SIZEOF_LONG;
byte deleteType = deleteBuffer[deleteOffset];
-
+
//Comparing with tail from memstore
for (KeyValue kv : tail) {
- DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer,
- deleteRowOffset, deleteRowLen, deleteQualifierOffset,
+ DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer,
+ deleteRowOffset, deleteRowLen, deleteQualifierOffset,
deleteQualifierLen, deleteTimestampOffset, deleteType,
comparator.getRawComparator());
if (res == DeleteCode.DONE) {
@@ -272,7 +255,7 @@
notpresent = this.kvset.remove(kv);
s -= heapSizeChange(kv, notpresent);
}
-
+
// Adding the delete to memstore. Add any value, as long as
// same instance each time.
s += heapSizeChange(delete, this.kvset.add(delete));
@@ -282,7 +265,7 @@
this.size.addAndGet(s);
return s;
}
-
+
/**
* @param kv Find the row that comes after this one. If null, we return the
* first.
@@ -459,7 +442,8 @@
this.lock.readLock().lock();
try {
KeyValueScanner [] scanners = new KeyValueScanner[1];
- scanners[0] = new MemStoreScanner(this.changedMemStoreObservers);
+ scanners[0] = new MemStoreScanner(this.kvset.clone(),
+ this.snapshot.clone(), this.comparator);
return scanners;
} finally {
this.lock.readLock().unlock();
@@ -533,7 +517,7 @@
void readLockUnlock() {
this.lock.readLock().unlock();
}
-
+
/**
*
* @param set memstore or snapshot
@@ -566,171 +550,11 @@
}
return false;
}
-
-
- /*
- * MemStoreScanner implements the KeyValueScanner.
- * It lets the caller scan the contents of a memstore -- both current
- * map and snapshot.
- * This behaves as if it were a real scanner but does not maintain position.
- */
- protected class MemStoreScanner implements KeyValueScanner, ChangedMemStoreObserver {
- private List<KeyValue> result = new ArrayList<KeyValue>();
- private int idx = 0;
- // Make access atomic.
- private FirstOnRow firstOnNextRow = new FirstOnRow();
- // Keep reference to Set so can remove myself when closed.
- private final Set<ChangedMemStoreObserver> observers;
-
- MemStoreScanner(final Set<ChangedMemStoreObserver> observers) {
- super();
- this.observers = observers;
- this.observers.add(this);
- }
-
- public boolean seek(KeyValue key) {
- try {
- if (key == null) {
- close();
- return false;
- }
- this.firstOnNextRow.set(key);
- return cacheNextRow();
- } catch(Exception e) {
- close();
- return false;
- }
- }
-
- public KeyValue peek() {
- if (idx >= this.result.size()) {
- if (!cacheNextRow()) {
- return null;
- }
- return peek();
- }
- return result.get(idx);
- }
-
- public KeyValue next() {
- if (idx >= result.size()) {
- if (!cacheNextRow()) {
- return null;
- }
- return next();
- }
- return this.result.get(idx++);
- }
-
- /**
- * @return True if successfully cached a next row.
- */
- boolean cacheNextRow() {
- // Prevent snapshot being cleared while caching a row.
- lock.readLock().lock();
- try {
- this.result.clear();
- this.idx = 0;
- // Look at each set, kvset and snapshot.
- // Both look for matching entries for this.current row returning what
- // they
- // have as next row after this.current (or null if nothing in set or if
- // nothing follows.
- KeyValue kvsetNextRow = cacheNextRow(kvset);
- KeyValue snapshotNextRow = cacheNextRow(snapshot);
- if (kvsetNextRow == null && snapshotNextRow == null) {
- // Nothing more in memstore but we might have gotten current row
- // results
- // Indicate at end of store by setting next row to null.
- this.firstOnNextRow.set(null);
- return !this.result.isEmpty();
- } else if (kvsetNextRow != null && snapshotNextRow != null) {
- // Set current at the lowest of the two values.
- int compare = comparator.compare(kvsetNextRow, snapshotNextRow);
- this.firstOnNextRow.set(compare <= 0? kvsetNextRow: snapshotNextRow);
- } else {
- this.firstOnNextRow.set(kvsetNextRow != null? kvsetNextRow: snapshotNextRow);
- }
- return true;
- } finally {
- lock.readLock().unlock();
- }
- }
-
- /*
- * See if set has entries for the <code>this.current</code> row. If so,
- * add them to <code>this.result</code>.
- * @param set Set to examine
- * @return Next row in passed <code>set</code> or null if nothing in this
- * passed <code>set</code>
- */
- private KeyValue cacheNextRow(final NavigableSet<KeyValue> set) {
- if (this.firstOnNextRow.get() == null || set.isEmpty()) return null;
- SortedSet<KeyValue> tail = set.tailSet(this.firstOnNextRow.get());
- if (tail == null || tail.isEmpty()) return null;
- KeyValue first = tail.first();
- KeyValue nextRow = null;
- for (KeyValue kv: tail) {
- if (comparator.compareRows(first, kv) != 0) {
- nextRow = kv;
- break;
- }
- this.result.add(kv);
- }
- return nextRow;
- }
-
- public void close() {
- this.firstOnNextRow.set(null);
- idx = 0;
- if (!result.isEmpty()) {
- result.clear();
- }
- this.observers.remove(this);
- }
-
- public void changedMemStore() {
- this.firstOnNextRow.reset();
- }
- }
- /*
- * Private class that holds firstOnRow and utility.
- * Usually firstOnRow is the first KeyValue we find on next row rather than
- * the absolute minimal first key (empty column, Type.Maximum, maximum ts).
- * Usually its ok being sloppy with firstOnRow letting it be the first thing
- * found on next row -- this works -- but if the memstore changes on us, reset
- * firstOnRow to be the ultimate firstOnRow. We play sloppy with firstOnRow
- * usually so we don't have to allocate a new KeyValue each time firstOnRow
- * is updated.
- */
- private static class FirstOnRow {
- private KeyValue firstOnRow = null;
-
- FirstOnRow() {
- super();
- }
-
- synchronized void set(final KeyValue kv) {
- this.firstOnRow = kv;
- }
-
- /* Reset firstOnRow to a 'clean', absolute firstOnRow.
- */
- synchronized void reset() {
- if (this.firstOnRow == null) return;
- this.firstOnRow =
- new KeyValue(this.firstOnRow.getRow(), HConstants.LATEST_TIMESTAMP);
- }
-
- synchronized KeyValue get() {
- return this.firstOnRow;
- }
- }
public final static long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + (8 * ClassSize.REFERENCE));
-
+ ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
+
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
@@ -744,11 +568,11 @@
* @return Size
*/
long heapSizeChange(final KeyValue kv, final boolean notpresent) {
- return notpresent ?
+ return notpresent ?
ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
0;
}
-
+
/**
* Get the entire heap usage for this MemStore not including keys in the
* snapshot.
@@ -757,7 +581,7 @@
public long heapSize() {
return size.get();
}
-
+
/**
* Get the heap usage of KVs in this MemStore.
*/
@@ -806,15 +630,4 @@
LOG.info("Exiting.");
}
- /**
- * Observers want to know about MemStore changes.
- * Called when snapshot is cleared and when we make one.
- */
- interface ChangedMemStoreObserver {
- /**
- * Notify observers.
- * @throws IOException
- */
- void changedMemStore();
- }
}
Added: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java?rev=897547&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java (added)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java Sat Jan 9 21:09:59 2010
@@ -0,0 +1,164 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * MemStoreScanner implements the KeyValueScanner.
+ * It lets the caller scan the contents of a memstore -- both current
+ * map and snapshot.
+ * <p/>
+ * The memstore scanner keeps its own reference to the main and snapshot
+ * key/value sets. Keeping those references allows the scanner to be indifferent
+ * to memstore flushes. Calling the {@link #close()} method ensures that the
+ * references to those classes are null'd allowing the GC to pick them up.
+ */
+class MemStoreScanner implements KeyValueScanner {
+ private static final Log LOG = LogFactory.getLog(MemStoreScanner.class);
+
+ private static final
+ SortedSet<KeyValue> EMPTY_SET = new TreeSet<KeyValue>();
+ private static final Iterator<KeyValue> EMPTY_ITERATOR =
+ new Iterator<KeyValue>() {
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+ @Override
+ public KeyValue next() {
+ return null;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+
+ private SortedSet<KeyValue> kvsetRef;
+ private SortedSet<KeyValue> snapshotRef;
+ private KeyValue.KVComparator comparatorRef;
+ private Iterator<KeyValue> kvsetIterator;
+ private Iterator<KeyValue> snapshotIterator;
+
+ private KeyValue currentKvsetKV;
+ private KeyValue currentSnapshotKV;
+ private KeyValue nextKV;
+
+ /**
+ * Create a new memstore scanner.
+ *
+ * @param kvset the main key value set
+ * @param snapshot the snapshot set
+ * @param comparator the comparator to use
+ */
+ MemStoreScanner(KeyValueSkipListSet kvset,
+ KeyValueSkipListSet snapshot, KeyValue.KVComparator comparator) {
+ super();
+ this.kvsetRef = kvset;
+ this.snapshotRef = snapshot != null ? snapshot : EMPTY_SET;
+ this.comparatorRef = comparator;
+ this.kvsetIterator = kvsetRef.iterator();
+ this.snapshotIterator = snapshotRef.iterator();
+ this.nextKV = currentKvsetKV = currentSnapshotKV = null;
+ }
+
+ private void fill() {
+ if (nextKV == null) {
+ if (currentSnapshotKV == null && snapshotIterator.hasNext()) {
+ currentSnapshotKV = snapshotIterator.next();
+ }
+
+ if (currentKvsetKV == null && kvsetIterator.hasNext()) {
+ currentKvsetKV = kvsetIterator.next();
+ }
+
+ if (currentSnapshotKV != null && currentKvsetKV != null) {
+ int cmp = comparatorRef.compare(currentSnapshotKV, currentKvsetKV);
+ if (cmp <= 0) {
+ nextKV = currentSnapshotKV;
+ currentSnapshotKV = null;
+ } else {
+ nextKV = currentKvsetKV;
+ currentKvsetKV = null;
+ }
+ } else if (currentSnapshotKV != null) {
+ nextKV = currentSnapshotKV;
+ currentSnapshotKV = null;
+ } else {
+ nextKV = currentKvsetKV;
+ currentKvsetKV = null;
+ }
+ }
+ }
+
+ @Override
+ public synchronized boolean seek(KeyValue key) {
+ if (key == null) {
+ close();
+ return false;
+ }
+ SortedSet<KeyValue> kvsetTail = kvsetRef.tailSet(key);
+ SortedSet<KeyValue> snapshotTail = snapshotRef != null ?
+ snapshotRef.tailSet(key) : EMPTY_SET;
+
+ kvsetIterator = kvsetTail.iterator();
+ snapshotIterator = snapshotTail.iterator();
+
+ currentKvsetKV = null;
+ currentSnapshotKV = null;
+ nextKV = null;
+
+ return kvsetIterator.hasNext() || snapshotIterator.hasNext();
+ }
+
+ @Override
+ public synchronized KeyValue peek() {
+ fill();
+ return nextKV;
+ }
+
+ @Override
+ public synchronized KeyValue next() {
+ fill();
+ KeyValue next = nextKV;
+ nextKV = null;
+ return next;
+ }
+
+ public synchronized void close() {
+ this.kvsetRef = EMPTY_SET;
+ this.snapshotRef = EMPTY_SET;
+ this.kvsetIterator = EMPTY_ITERATOR;
+ this.snapshotIterator = EMPTY_ITERATOR;
+ this.currentKvsetKV = null;
+ this.currentSnapshotKV = null;
+ this.nextKV = null;
+ }
+}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java Sat Jan 9 21:09:59 2010
@@ -104,7 +104,7 @@
@Override
public boolean isDeleted(byte [] buffer, int qualifierOffset,
int qualifierLength, long timestamp) {
- if (timestamp < familyStamp) {
+ if (timestamp <= familyStamp) {
return true;
}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Jan 9 21:09:59 2010
@@ -31,6 +31,7 @@
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
+import java.util.SortedSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -507,21 +508,12 @@
* @return true if a compaction is needed
* @throws IOException
*/
- boolean flushCache(final long logCacheFlushId) throws IOException {
- // Get the snapshot to flush. Presumes that a call to
- // this.memstore.snapshot() has happened earlier up in the chain.
- KeyValueSkipListSet snapshot = this.memstore.getSnapshot();
+ private StoreFile flushCache(final long logCacheFlushId,
+ SortedSet<KeyValue> snapshot) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
- StoreFile sf = internalFlushCache(snapshot, logCacheFlushId);
- if (sf == null) {
- return false;
- }
- // Add new file to store files. Clear snapshot too while we have the
- // Store write lock.
- int size = updateStorefiles(logCacheFlushId, sf, snapshot);
- return size >= this.compactionThreshold;
+ return internalFlushCache(snapshot, logCacheFlushId);
}
/*
@@ -530,7 +522,7 @@
* @return StoreFile created.
* @throws IOException
*/
- private StoreFile internalFlushCache(final KeyValueSkipListSet set,
+ private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
final long logCacheFlushId)
throws IOException {
HFile.Writer writer = null;
@@ -600,20 +592,18 @@
* @param sf
* @param set That was used to make the passed file <code>p</code>.
* @throws IOException
- * @return Count of store files.
+ * @return Whether compaction is required.
*/
- private int updateStorefiles(final long logCacheFlushId,
- final StoreFile sf, final KeyValueSkipListSet set)
+ private boolean updateStorefiles(final long logCacheFlushId,
+ final StoreFile sf, final SortedSet<KeyValue> set)
throws IOException {
- int count = 0;
this.lock.writeLock().lock();
try {
this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
- count = this.storefiles.size();
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
this.memstore.clearSnapshot(set);
- return count;
+ return this.storefiles.size() >= this.compactionThreshold;
} finally {
this.lock.writeLock().unlock();
}
@@ -1556,4 +1546,41 @@
public long heapSize() {
return DEEP_OVERHEAD + this.memstore.heapSize();
}
+
+ public StoreFlusher getStoreFlusher(long cacheFlushId) {
+ return new StoreFlusherImpl(cacheFlushId);
+ }
+
+ private class StoreFlusherImpl implements StoreFlusher {
+
+ private long cacheFlushId;
+ private SortedSet<KeyValue> snapshot;
+ private StoreFile storeFile;
+
+ private StoreFlusherImpl(long cacheFlushId) {
+ this.cacheFlushId = cacheFlushId;
+ }
+
+
+ @Override
+ public void prepare() {
+ memstore.snapshot();
+ this.snapshot = memstore.getSnapshot();
+ }
+
+ @Override
+ public void flushCache() throws IOException {
+ storeFile = Store.this.flushCache(cacheFlushId, snapshot);
+ }
+
+ @Override
+ public boolean commit() throws IOException {
+ if (storeFile == null) {
+ return false;
+ }
+ // Add new file to store files. Clear snapshot too while we have the
+ // Store write lock.
+ return Store.this.updateStorefiles(cacheFlushId,storeFile, snapshot);
+ }
+ }
}
Added: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=897547&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (added)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Sat Jan 9 21:09:59 2010
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+/**
+ * An package protected interface for a store flushing.
+ * A store flusher carries the state requires to prepare/flush/commit the
+ * store's cache.
+ */
+interface StoreFlusher {
+
+ /**
+ * Prepare for store flush (create snapshot).
+ * <p/>
+ * Requires pausing writes.
+ * <p/>
+ * A very short operation.
+ */
+ void prepare();
+
+ /**
+ * Flush the cache (create the new store file).
+ * <p/>
+ * A lengthy operation which doesn't require locking out any function of
+ * the store.
+ *
+ * @throws IOException in case flush fails
+ */
+ void flushCache() throws IOException;
+
+ /**
+ * Commit the flush - add the store file to the store and clear the memstore
+ * snapshot.
+ * <p/>
+ * Requires pausing scans.
+ * <p/>
+ * A very short operation.
+ *
+ * @return
+ * @throws IOException
+ */
+ boolean commit() throws IOException;
+
+}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Jan 9 21:09:59 2010
@@ -247,6 +247,7 @@
// Reset the state of the Query Matcher and set to top row
matcher.reset();
- matcher.setRow(heap.peek().getRow());
+ KeyValue kv = heap.peek();
+ matcher.setRow((kv == null ? topKey : kv).getRow());
}
}
\ No newline at end of file
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java Sat Jan 9 21:09:59 2010
@@ -102,6 +102,31 @@
public final MetricsIntValue memstoreSizeMB =
new MetricsIntValue("memstoreSizeMB");
+ /**
+ * Size of the compaction queue.
+ */
+ public final MetricsIntValue compactionQueueSize =
+ new MetricsIntValue("compactionQueueSize");
+
+ /**
+ * filesystem read latency
+ */
+ public final MetricsTimeVaryingRate fsReadLatency =
+ new MetricsTimeVaryingRate("fsReadLatency");
+
+ /**
+ * filesystem write latency
+ */
+ public final MetricsTimeVaryingRate fsWriteLatency =
+ new MetricsTimeVaryingRate("fsWriteLatency");
+
+ /**
+ * filesystem sync latency
+ */
+ public final MetricsTimeVaryingRate fsSyncLatency =
+ new MetricsTimeVaryingRate("fsSyncLatency");
+
+
public RegionServerMetrics() {
MetricsContext context = MetricsUtil.getContext("hbase");
metricsRecord = MetricsUtil.createRecord(context, "regionserver");
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Bytes.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Bytes.java Sat Jan 9 21:09:59 2010
@@ -26,6 +26,7 @@
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.math.BigInteger;
+import java.math.BigDecimal;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -39,27 +40,27 @@
* HashSets, etc.
*/
public class Bytes {
-
+
/**
* Size of boolean in bytes
*/
public static final int SIZEOF_BOOLEAN = Byte.SIZE/Byte.SIZE;
-
+
/**
* Size of byte in bytes
*/
public static final int SIZEOF_BYTE = SIZEOF_BOOLEAN;
-
+
/**
* Size of char in bytes
*/
public static final int SIZEOF_CHAR = Character.SIZE/Byte.SIZE;
-
+
/**
* Size of double in bytes
*/
public static final int SIZEOF_DOUBLE = Double.SIZE/Byte.SIZE;
-
+
/**
* Size of float in bytes
*/
@@ -69,7 +70,7 @@
* Size of int in bytes
*/
public static final int SIZEOF_INT = Integer.SIZE/Byte.SIZE;
-
+
/**
* Size of long in bytes
*/
@@ -80,7 +81,7 @@
*/
public static final int SIZEOF_SHORT = Short.SIZE/Byte.SIZE;
-
+
/**
* Estimate of size cost to pay beyond payload in jvm for instance of byte [].
* Estimate based on study of jhat and jprofiler numbers.
@@ -118,16 +119,27 @@
*/
public static RawComparator<byte []> BYTES_RAWCOMPARATOR =
new ByteArrayComparator();
-
+
/**
* Read byte-array written with a WritableableUtils.vint prefix.
* @param in Input to read from.
* @return byte array read off <code>in</code>
- * @throws IOException
+ * @throws IOException
*/
public static byte [] readByteArray(final DataInput in)
throws IOException {
int len = WritableUtils.readVInt(in);
+ return readByteArray(in, len);
+ }
+
+ /**
+ * Read byte-array from data input.
+ * logic,
+ * @param in Input to read from.
+ * @return byte array read off <code>in</code>
+ * @throws IOException io error
+ */
+ public static byte[] readByteArray(DataInput in, int len) throws IOException {
if (len < 0) {
throw new NegativeArraySizeException(Integer.toString(len));
}
@@ -135,7 +147,7 @@
in.readFully(result, 0, len);
return result;
}
-
+
/**
* Read byte-array written with a WritableableUtils.vint prefix.
* IOException is converted to a RuntimeException.
@@ -380,7 +392,7 @@
}
return result;
}
-
+
/**
* Convert a boolean to a byte array.
* @param b
@@ -456,7 +468,7 @@
}
return l;
}
-
+
/**
* Put a long value out to the specified byte array position.
* @param bytes the byte array
@@ -570,7 +582,7 @@
b[0] = (byte)(val);
return b;
}
-
+
/**
* Converts a byte array to an int value
* @param bytes
@@ -609,7 +621,7 @@
}
return n;
}
-
+
/**
* Put an int value out to the specified byte array position.
* @param bytes the byte array
@@ -628,7 +640,7 @@
bytes[offset] = (byte)(val);
return offset + SIZEOF_INT;
}
-
+
/**
* Convert a short value to a byte array
* @param val
@@ -679,7 +691,7 @@
n ^= bytes[offset+1] & 0xFF;
return n;
}
-
+
/**
* Put a short value out to the specified byte array position.
* @param bytes the byte array
@@ -696,7 +708,239 @@
bytes[offset] = (byte)(val);
return offset + SIZEOF_SHORT;
}
-
+
+ /**
+ * Convert a char value to a byte array
+ *
+ * @param val
+ * @return the byte array
+ */
+ public static byte[] toBytes(char val) {
+ byte[] b = new byte[SIZEOF_CHAR];
+ b[1] = (byte) (val);
+ val >>= 8;
+ b[0] = (byte) (val);
+ return b;
+ }
+
+ /**
+ * Converts a byte array to a char value
+ *
+ * @param bytes
+ * @return the char value
+ */
+ public static char toChar(byte[] bytes) {
+ return toChar(bytes, 0);
+ }
+
+
+ /**
+ * Converts a byte array to a char value
+ *
+ * @param bytes
+ * @param offset
+ * @return the char value
+ */
+ public static char toChar(byte[] bytes, int offset) {
+ return toChar(bytes, offset, SIZEOF_CHAR);
+ }
+
+ /**
+ * Converts a byte array to a char value
+ *
+ * @param bytes
+ * @param offset
+ * @param length
+ * @return the char value
+ */
+ public static char toChar(byte[] bytes, int offset, final int length) {
+ if (bytes == null || length != SIZEOF_CHAR ||
+ (offset + length > bytes.length)) {
+ return (char)-1;
+ }
+ char n = 0;
+ n ^= bytes[offset] & 0xFF;
+ n <<= 8;
+ n ^= bytes[offset + 1] & 0xFF;
+ return n;
+ }
+
+ /**
+ * Put a char value out to the specified byte array position.
+ *
+ * @param bytes the byte array
+ * @param offset position in the array
+ * @param val short to write out
+ * @return incremented offset
+ */
+ public static int putChar(byte[] bytes, int offset, char val) {
+ if (bytes == null || (bytes.length - offset < SIZEOF_CHAR)) {
+ return offset;
+ }
+ bytes[offset + 1] = (byte) (val);
+ val >>= 8;
+ bytes[offset] = (byte) (val);
+ return offset + SIZEOF_CHAR;
+ }
+
+ /**
+ * Convert a char array value to a byte array
+ *
+ * @param val
+ * @return the byte array
+ */
+ public static byte[] toBytes(char[] val) {
+ byte[] bytes = new byte[val.length * 2];
+ putChars(bytes,0,val);
+ return bytes;
+ }
+
+ /**
+ * Converts a byte array to a char array value
+ *
+ * @param bytes
+ * @return the char value
+ */
+ public static char[] toChars(byte[] bytes) {
+ return toChars(bytes, 0, bytes.length);
+ }
+
+
+ /**
+ * Converts a byte array to a char array value
+ *
+ * @param bytes
+ * @param offset
+ * @return the char value
+ */
+ public static char[] toChars(byte[] bytes, int offset) {
+ return toChars(bytes, offset, bytes.length-offset);
+ }
+
+ /**
+ * Converts a byte array to a char array value
+ *
+ * @param bytes
+ * @param offset
+ * @param length
+ * @return the char value
+ */
+ public static char[] toChars(byte[] bytes, int offset, final int length) {
+ int max = offset + length;
+ if (bytes == null || (max > bytes.length) || length %2 ==1) {
+ return null;
+ }
+
+ char[] chars = new char[length / 2];
+ for (int i = 0, j = offset; i < chars.length && j < max; i++, j += 2) {
+ char c = 0;
+ c ^= bytes[j] & 0xFF;
+ c <<= 8;
+ c ^= bytes[j + 1] & 0xFF;
+ chars[i] = c;
+ }
+ return chars;
+ }
+
+ /**
+ * Put a char array value out to the specified byte array position.
+ *
+ * @param bytes the byte array
+ * @param offset position in the array
+ * @param val short to write out
+ * @return incremented offset
+ */
+ public static int putChars(byte[] bytes, int offset, char[] val) {
+ int max = val.length * 2 + offset;
+ if (bytes == null || (bytes.length < max)) {
+ return offset;
+ }
+ for (int i=0,j=offset; i<val.length && j<max;i++, j+=2){
+ char c = val[i];
+ bytes[j + 1] = (byte) (c);
+ bytes[j] = (byte) (c >>>8);
+ }
+
+ return offset + SIZEOF_CHAR;
+ }
+
+
+ /**
+ * Convert a BigDecimal value to a byte array
+ *
+ * @param val
+ * @return the byte array
+ */
+ public static byte[] toBytes(BigDecimal val) {
+ byte[] valueBytes = val.unscaledValue().toByteArray();
+ byte[] result = new byte[valueBytes.length + SIZEOF_INT];
+ int offset = putInt(result, 0, val.scale());
+ putBytes(result, offset, valueBytes, 0, valueBytes.length);
+ return result;
+ }
+
+ /**
+ * Converts a byte array to a BigDecimal
+ *
+ * @param bytes
+ * @return the char value
+ */
+ public static BigDecimal toBigDecimal(byte[] bytes) {
+ return toBigDecimal(bytes, 0, bytes.length);
+ }
+
+
+ /**
+ * Converts a byte array to a BigDecimal value
+ *
+ * @param bytes
+ * @param offset
+ * @return the char value
+ */
+ public static BigDecimal toBigDecimal(byte[] bytes, int offset) {
+ return toBigDecimal(bytes, offset, bytes.length);
+ }
+
+ /**
+ * Converts a byte array to a BigDecimal value
+ *
+ * @param bytes
+ * @param offset
+ * @param length
+ * @return the char value
+ */
+ public static BigDecimal toBigDecimal(byte[] bytes, int offset, final int length) {
+ if (bytes == null || length < SIZEOF_INT + 1 ||
+ (offset + length > bytes.length)) {
+ return null;
+ }
+
+ int scale = toInt(bytes, 0);
+ byte[] tcBytes = new byte[length - SIZEOF_INT];
+ System.arraycopy(bytes, SIZEOF_INT, tcBytes, 0, length - SIZEOF_INT);
+ return new BigDecimal(new BigInteger(tcBytes), scale);
+ }
+
+ /**
+ * Put a BigDecimal value out to the specified byte array position.
+ *
+ * @param bytes the byte array
+ * @param offset position in the array
+ * @param val BigDecimal to write out
+ * @return incremented offset
+ */
+ public static int putBigDecimal(byte[] bytes, int offset, BigDecimal val) {
+ if (bytes == null) {
+ return offset;
+ }
+
+ byte[] valueBytes = val.unscaledValue().toByteArray();
+ byte[] result = new byte[valueBytes.length + SIZEOF_INT];
+ offset = putInt(result, offset, val.scale());
+ return putBytes(result, offset, valueBytes, 0, valueBytes.length);
+ }
+
+
/**
* @param vint Integer to make a vint of.
* @return Vint as bytes array.
@@ -759,7 +1003,7 @@
* Reads a zero-compressed encoded long from input stream and returns it.
* @param buffer Binary array
* @param offset Offset into array at which vint begins.
- * @throws java.io.IOException
+ * @throws java.io.IOException
* @return deserialized long from stream.
*/
public static long readVLong(final byte [] buffer, final int offset)
@@ -822,7 +1066,7 @@
(left == null || right == null || (left.length != right.length))? false:
compareTo(left, right) == 0;
}
-
+
/**
* @param b
* @return Runs {@link WritableComparator#hashBytes(byte[], int)} on the
@@ -885,7 +1129,7 @@
System.arraycopy(c, 0, result, a.length + b.length, c.length);
return result;
}
-
+
/**
* @param a
* @param length
@@ -920,7 +1164,7 @@
for(int i=0;i<length;i++) padding[i] = 0;
return add(padding,a);
}
-
+
/**
* @param a
* @param length
@@ -931,7 +1175,7 @@
for(int i=0;i<length;i++) padding[i] = 0;
return add(a,padding);
}
-
+
/**
* Split passed range. Expensive operation relatively. Uses BigInteger math.
* Useful splitting ranges for MapReduce jobs.
@@ -986,7 +1230,7 @@
result[num+1] = b;
return result;
}
-
+
/**
* @param t
* @return Array of byte arrays made from passed array of Text
@@ -1007,7 +1251,7 @@
public static byte [][] toByteArrays(final String column) {
return toByteArrays(toBytes(column));
}
-
+
/**
* @param column
* @return A byte array of a byte array where first and only entry is
@@ -1018,7 +1262,7 @@
result[0] = column;
return result;
}
-
+
/**
* Binary search for keys in indexes.
* @param arr array of byte arrays to search for
@@ -1032,7 +1276,7 @@
int length, RawComparator<byte []> comparator) {
int low = 0;
int high = arr.length - 1;
-
+
while (low <= high) {
int mid = (low+high) >>> 1;
// we have to compare in this order, because the comparator order
@@ -1046,7 +1290,7 @@
else if (cmp < 0)
high = mid - 1;
// BAM. how often does this really happen?
- else
+ else
return mid;
}
return - (low+1);
@@ -1055,13 +1299,13 @@
/**
* Bytewise binary increment/deincrement of long contained in byte array
* on given amount.
- *
+ *
* @param value - array of bytes containing long (length <= SIZEOF_LONG)
* @param amount value will be incremented on (deincremented if negative)
* @return array of bytes containing incremented long (length == SIZEOF_LONG)
* @throws IOException - if value.length > SIZEOF_LONG
*/
- public static byte [] incrementBytes(byte[] value, long amount)
+ public static byte [] incrementBytes(byte[] value, long amount)
throws IOException {
byte[] val = value;
if (val.length < SIZEOF_LONG) {
@@ -1094,7 +1338,7 @@
if (amount < 0) {
amo = -amount;
sign = -1;
- }
+ }
for(int i=0;i<value.length;i++) {
int cur = ((int)amo % 256) * sign;
amo = (amo >> 8);
@@ -1136,5 +1380,5 @@
}
return value;
}
-
+
}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/FSUtils.java Sat Jan 9 21:09:59 2010
@@ -24,6 +24,8 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,6 +41,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
@@ -143,13 +146,18 @@
// Are there any data nodes up yet?
// Currently the safe mode check falls through if the namenode is up but no
// datanodes have reported in yet.
- while (dfs.getDataNodeStats().length == 0) {
- LOG.info("Waiting for dfs to come up...");
- try {
- Thread.sleep(wait);
- } catch (InterruptedException e) {
- //continue
+ try {
+ while (dfs.getDataNodeStats().length == 0) {
+ LOG.info("Waiting for dfs to come up...");
+ try {
+ Thread.sleep(wait);
+ } catch (InterruptedException e) {
+ //continue
+ }
}
+ } catch (IOException e) {
+ // getDataNodeStats can fail if superuser privilege is required to run
+ // the datanode report, just ignore it
}
// Make sure dfs is not in safe mode
while (dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET)) {
@@ -363,6 +371,98 @@
}
/**
+ * Returns the total overall fragmentation percentage. Includes .META. and
+ * -ROOT- as well.
+ *
+ * @param master The master defining the HBase root and file system.
+ * @return A map for each table and its percentage.
+ * @throws IOException When scanning the directory fails.
+ */
+ public static int getTotalTableFragmentation(final HMaster master)
+ throws IOException {
+ Map<String, Integer> map = getTableFragmentation(master);
+ return map != null && map.size() > 0 ? map.get("-TOTAL-").intValue() : -1;
+ }
+
+ /**
+ * Runs through the HBase rootdir and checks how many stores for each table
+ * have more than one file in them. Checks -ROOT- and .META. too. The total
+ * percentage across all tables is stored under the special key "-TOTAL-".
+ *
+ * @param master The master defining the HBase root and file system.
+ * @return A map for each table and its percentage.
+ * @throws IOException When scanning the directory fails.
+ */
+ public static Map<String, Integer> getTableFragmentation(
+ final HMaster master)
+ throws IOException {
+ Path path = master.getRootDir();
+ // since HMaster.getFileSystem() is package private
+ FileSystem fs = path.getFileSystem(master.getConfiguration());
+ return getTableFragmentation(fs, path);
+ }
+
+ /**
+ * Runs through the HBase rootdir and checks how many stores for each table
+ * have more than one file in them. Checks -ROOT- and .META. too. The total
+ * percentage across all tables is stored under the special key "-TOTAL-".
+ *
+ * @param fs The file system to use.
+ * @param hbaseRootDir The root directory to scan.
+ * @return A map for each table and its percentage.
+ * @throws IOException When scanning the directory fails.
+ */
+ public static Map<String, Integer> getTableFragmentation(
+ final FileSystem fs, final Path hbaseRootDir)
+ throws IOException {
+ Map<String, Integer> frags = new HashMap<String, Integer>();
+ int cfCountTotal = 0;
+ int cfFragTotal = 0;
+ DirFilter df = new DirFilter(fs);
+ // presumes any directory under hbase.rootdir is a table
+ FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, df);
+ for (int i = 0; i < tableDirs.length; i++) {
+ // Skip the .log directory. All others should be tables. Inside a table,
+ // there are compaction.dir directories to skip. Otherwise, all else
+ // should be regions. Then in each region, should only be family
+ // directories. Under each of these, should be one file only.
+ Path d = tableDirs[i].getPath();
+ if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
+ continue;
+ }
+ int cfCount = 0;
+ int cfFrag = 0;
+ FileStatus [] regionDirs = fs.listStatus(d, df);
+ for (int j = 0; j < regionDirs.length; j++) {
+ Path dd = regionDirs[j].getPath();
+ if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
+ continue;
+ }
+ // else its a region name, now look in region for families
+ FileStatus [] familyDirs = fs.listStatus(dd, df);
+ for (int k = 0; k < familyDirs.length; k++) {
+ cfCount++;
+ cfCountTotal++;
+ Path family = familyDirs[k].getPath();
+ // now in family make sure only one file
+ FileStatus [] familyStatus = fs.listStatus(family);
+ if (familyStatus.length > 1) {
+ cfFrag++;
+ cfFragTotal++;
+ }
+ }
+ }
+ // compute percentage per table and store in result list
+ frags.put(d.getName(), new Integer(
+ Math.round((float) cfFrag / cfCount * 100)));
+ }
+ // set overall percentage for all tables
+ frags.put("-TOTAL-", new Integer(
+ Math.round((float) cfFragTotal / cfCountTotal * 100)));
+ return frags;
+ }
+
+ /**
* Expects to find -ROOT- directory.
* @param fs
* @param hbaseRootDir
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Pair.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Pair.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Pair.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Pair.java Sat Jan 9 21:09:59 2010
@@ -1,24 +1,20 @@
package org.apache.hadoop.hbase.util;
import java.io.Serializable;
+import java.util.Arrays;
/**
- * A generic class for pairs.
- * @param <T1>
- * @param <T2>
+ * A generic class for immutable pairs.
+ * @param <T1>
+ * @param <T2>
*/
-public class Pair<T1, T2> implements Serializable
+public final class Pair<T1, T2> implements Serializable
{
private static final long serialVersionUID = -3986244606585552569L;
protected T1 first = null;
protected T2 second = null;
+ private int hashcode;
- /**
- * Default constructor.
- */
- public Pair()
- {
- }
/**
* Constructor
@@ -29,24 +25,38 @@
{
this.first = a;
this.second = b;
- }
- /**
- * Replace the first element of the pair.
- * @param a
- */
- public void setFirst(T1 a)
- {
- this.first = a;
- }
-
- /**
- * Replace the second element of the pair.
- * @param b
- */
- public void setSecond(T2 b)
- {
- this.second = b;
+ // generate a hash code
+ hashcode = first != null ? generateHashCode(first) : 0;
+ hashcode = 31 * hashcode + (second != null ? generateHashCode(second) : 0);
+ }
+
+ private static int generateHashCode(Object o) {
+ if (o.getClass().isArray()) {
+ if (o instanceof long[]) {
+ return Arrays.hashCode((long[]) o);
+ } else if (o instanceof int[]) {
+ return Arrays.hashCode((int[]) o);
+ } else if (o instanceof short[]) {
+ return Arrays.hashCode((short[]) o);
+ } else if (o instanceof char[]) {
+ return Arrays.hashCode((char[]) o);
+ } else if (o instanceof byte[]) {
+ return Arrays.hashCode((byte[]) o);
+ } else if (o instanceof double[]) {
+ return Arrays.hashCode((double[]) o);
+ } else if (o instanceof float[]) {
+ return Arrays.hashCode((float[]) o);
+ } else if (o instanceof boolean[]) {
+ return Arrays.hashCode((boolean[]) o);
+ } else {
+ // Not an array of primitives
+ return Arrays.hashCode((Object[]) o);
+ }
+ } else {
+ // Standard comparison
+ return o.hashCode();
+ }
}
/**
@@ -67,9 +77,55 @@
return second;
}
+ /**
+ * Creates a new instance of the pair encapsulating the supplied values.
+ *
+ * @param one the first value
+ * @param two the second value
+ * @param <T1> the type of the first element.
+ * @param <T2> the type of the second element.
+ * @return the new instance
+ */
+ public static <T1, T2> Pair<T1, T2> of(T1 one, T2 two)
+ {
+ return new Pair<T1, T2>(one, two);
+ }
+
private static boolean equals(Object x, Object y)
{
- return (x == null && y == null) || (x != null && x.equals(y));
+ // Null safe compare first
+ if (x == null || y == null) {
+ return x == y;
+ }
+
+ Class clazz = x.getClass();
+ // If they are both the same type of array
+ if (clazz.isArray() && clazz == y.getClass()) {
+ // NOTE: this section is borrowed from EqualsBuilder in commons-lang
+ if (x instanceof long[]) {
+ return Arrays.equals((long[]) x, (long[]) y);
+ } else if (x instanceof int[]) {
+ return Arrays.equals((int[]) x, (int[]) y);
+ } else if (x instanceof short[]) {
+ return Arrays.equals((short[]) x, (short[]) y);
+ } else if (x instanceof char[]) {
+ return Arrays.equals((char[]) x, (char[]) y);
+ } else if (x instanceof byte[]) {
+ return Arrays.equals((byte[]) x, (byte[]) y);
+ } else if (x instanceof double[]) {
+ return Arrays.equals((double[]) x, (double[]) y);
+ } else if (x instanceof float[]) {
+ return Arrays.equals((float[]) x, (float[]) y);
+ } else if (x instanceof boolean[]) {
+ return Arrays.equals((boolean[]) x, (boolean[]) y);
+ } else {
+ // Not an array of primitives
+ return Arrays.deepEquals((Object[]) x, (Object[]) y);
+ }
+ } else {
+ // Standard comparison
+ return x.equals(y);
+ }
}
@Override
@@ -83,12 +139,7 @@
@Override
public int hashCode()
{
- if (first == null)
- return (second == null) ? 0 : second.hashCode() + 1;
- else if (second == null)
- return first.hashCode() + 2;
- else
- return first.hashCode() * 17 + second.hashCode();
+ return hashcode;
}
@Override
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Writables.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Writables.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Writables.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Writables.java Sat Jan 9 21:09:59 2010
@@ -22,11 +22,13 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.DataInput;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
/**
* Utility class with methods for manipulating Writable objects
@@ -193,4 +195,31 @@
}
return Bytes.toLong(c.getValue());
}
+
+ /**
+ * Reads a zero-compressed encoded long from input stream and returns it.
+ * This method is a copy of {@link WritableUtils#readVLong(java.io.DataInput)}
+ * changed to allow the first byte to be provided as an argument.
+ * todo add this method to hadoop WritableUtils and refactor the base method
+ * to use it.
+ *
+ * @param stream Binary input stream
+ * @param firstByte the first byte of the vlong
+ * @return deserialized long from stream.
+ * @throws java.io.IOException io error
+ */
+ public static long readVLong(DataInput stream, byte firstByte)
+ throws IOException {
+ int len = WritableUtils.decodeVIntSize(firstByte);
+ if (len == 1) {
+ return firstByte;
+ }
+ long i = 0;
+ for (int idx = 0; idx < len - 1; idx++) {
+ byte b = stream.readByte();
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+ }
}
\ No newline at end of file
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java Sat Jan 9 21:09:59 2010
@@ -379,7 +379,7 @@
try {
return readAddressOrThrow(znode, watcher);
} catch (IOException e) {
- e.printStackTrace();
+ LOG.debug("readAddress " +znode, e);
return null;
}
}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java Sat Jan 9 21:09:59 2010
@@ -21,7 +21,9 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.Path;
@@ -59,6 +61,31 @@
shutdownDfs(cluster);
super.tearDown();
}
+
+ /**
+ * Test the findMemstoresWithEditsOlderThan method.
+ * @throws IOException
+ */
+ public void testFindMemstoresWithEditsOlderThan() throws IOException {
+ Map<byte [], Long> regionsToSeqids = new HashMap<byte [], Long>();
+ for (int i = 0; i < 10; i++) {
+ Long l = new Long(i);
+ regionsToSeqids.put(l.toString().getBytes(), l);
+ }
+ byte [][] regions =
+ HLog.findMemstoresWithEditsOlderThan(1, regionsToSeqids);
+ assertEquals(1, regions.length);
+ assertTrue(Bytes.equals(regions[0], "0".getBytes()));
+ regions = HLog.findMemstoresWithEditsOlderThan(3, regionsToSeqids);
+ int count = 3;
+ assertEquals(count, regions.length);
+ // Regions returned are not ordered.
+ for (int i = 0; i < count; i++) {
+ assertTrue(Bytes.equals(regions[i], "0".getBytes()) ||
+ Bytes.equals(regions[i], "1".getBytes()) ||
+ Bytes.equals(regions[i], "2".getBytes()));
+ }
+ }
/**
* Just write multiple logs then split. Before fix for HADOOP-2283, this
@@ -184,5 +211,4 @@
}
}
}
-
}
\ No newline at end of file
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Sat Jan 9 21:09:59 2010
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,9 +44,15 @@
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import junit.framework.Assert;
/**
* Basic stand-alone testing of HRegion.
@@ -62,13 +69,13 @@
private final int MAX_VERSIONS = 2;
// Test names
- private final byte[] tableName = Bytes.toBytes("testtable");;
- private final byte[] qual1 = Bytes.toBytes("qual1");
- private final byte[] qual2 = Bytes.toBytes("qual2");
- private final byte[] qual3 = Bytes.toBytes("qual3");
- private final byte[] value1 = Bytes.toBytes("value1");
- private final byte[] value2 = Bytes.toBytes("value2");
- private final byte [] row = Bytes.toBytes("rowA");
+ protected final byte[] tableName = Bytes.toBytes("testtable");;
+ protected final byte[] qual1 = Bytes.toBytes("qual1");
+ protected final byte[] qual2 = Bytes.toBytes("qual2");
+ protected final byte[] qual3 = Bytes.toBytes("qual3");
+ protected final byte[] value1 = Bytes.toBytes("value1");
+ protected final byte[] value2 = Bytes.toBytes("value2");
+ protected final byte [] row = Bytes.toBytes("rowA");
/**
* @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
@@ -1773,7 +1780,370 @@
}
}
}
-
+
+ /**
+ * Flushes the cache in a thread while scanning. The tests verify that the
+ * scan is coherent - e.g. the returned results are always of the same or
+ * later update as the previous results.
+ * @throws IOException scan / compact
+ * @throws InterruptedException thread join
+ */
+ public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
+ byte[] tableName = Bytes.toBytes("testFlushCacheWhileScanning");
+ byte[] family = Bytes.toBytes("family");
+ int numRows = 1000;
+ int flushAndScanInterval = 10;
+ int compactInterval = 10 * flushAndScanInterval;
+
+ String method = "testFlushCacheWhileScanning";
+ initHRegion(tableName,method, family);
+ FlushThread flushThread = new FlushThread();
+ flushThread.start();
+
+ Scan scan = new Scan();
+ scan.addFamily(family);
+ scan.setFilter(new SingleColumnValueFilter(family, qual1,
+ CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(5L))));
+
+ int expectedCount = 0;
+ List<KeyValue> res = new ArrayList<KeyValue>();
+
+ boolean toggle=true;
+ for (long i = 0; i < numRows; i++) {
+ Put put = new Put(Bytes.toBytes(i));
+ put.add(family, qual1, Bytes.toBytes(i % 10));
+ region.put(put);
+
+ if (i != 0 && i % compactInterval == 0) {
+ //System.out.println("iteration = " + i);
+ region.compactStores(true);
+ }
+
+ if (i % 10 == 5L) {
+ expectedCount++;
+ }
+
+ if (i != 0 && i % flushAndScanInterval == 0) {
+ res.clear();
+ InternalScanner scanner = region.getScanner(scan);
+ if (toggle) {
+ flushThread.flush();
+ }
+ while (scanner.next(res)) ;
+ if (!toggle) {
+ flushThread.flush();
+ }
+ Assert.assertEquals("i=" + i, expectedCount, res.size());
+ toggle = !toggle;
+ }
+ }
+
+ flushThread.done();
+ flushThread.join();
+ flushThread.checkNoError();
+ }
+
+ protected class FlushThread extends Thread {
+ private volatile boolean done;
+ private Throwable error = null;
+
+ public void done() {
+ done = true;
+ synchronized (this) {
+ interrupt();
+ }
+ }
+
+ public void checkNoError() {
+ if (error != null) {
+ Assert.assertNull(error);
+ }
+ }
+
+ @Override
+ public void run() {
+ done = false;
+ while (!done) {
+ synchronized (this) {
+ try {
+ wait();
+ } catch (InterruptedException ignored) {
+ if (done) {
+ break;
+ }
+ }
+ }
+ try {
+ region.flushcache();
+ } catch (IOException e) {
+ if (!done) {
+ LOG.error("Error while flusing cache", e);
+ error = e;
+ }
+ break;
+ }
+ }
+
+ }
+
+ public void flush() {
+ synchronized (this) {
+ notify();
+ }
+
+ }
+ }
+
+ /**
+ * Writes very wide records and scans for the latest every time..
+ * Flushes and compacts the region every now and then to keep things
+ * realistic.
+ *
+ * @throws IOException by flush / scan / compaction
+ * @throws InterruptedException when joining threads
+ */
+ public void testWritesWhileScanning()
+ throws IOException, InterruptedException {
+ byte[] tableName = Bytes.toBytes("testWritesWhileScanning");
+ int testCount = 100;
+ int numRows = 1;
+ int numFamilies = 10;
+ int numQualifiers = 100;
+ int flushInterval = 7;
+ int compactInterval = 5 * flushInterval;
+ byte[][] families = new byte[numFamilies][];
+ for (int i = 0; i < numFamilies; i++) {
+ families[i] = Bytes.toBytes("family" + i);
+ }
+ byte[][] qualifiers = new byte[numQualifiers][];
+ for (int i = 0; i < numQualifiers; i++) {
+ qualifiers[i] = Bytes.toBytes("qual" + i);
+ }
+
+ String method = "testWritesWhileScanning";
+ initHRegion(tableName, method, families);
+ PutThread putThread = new PutThread(numRows, families, qualifiers);
+ putThread.start();
+ FlushThread flushThread = new FlushThread();
+ flushThread.start();
+
+ Scan scan = new Scan();
+ scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
+ new BinaryComparator(Bytes.toBytes("row0"))));
+
+ int expectedCount = numFamilies * numQualifiers;
+ List<KeyValue> res = new ArrayList<KeyValue>();
+
+ long prevTimestamp = 0L;
+ for (int i = 0; i < testCount; i++) {
+
+ if (i != 0 && i % compactInterval == 0) {
+ region.compactStores(true);
+ }
+
+ if (i != 0 && i % flushInterval == 0) {
+ //System.out.println("scan iteration = " + i);
+ flushThread.flush();
+ }
+
+ boolean previousEmpty = res.isEmpty();
+ res.clear();
+ InternalScanner scanner = region.getScanner(scan);
+ while (scanner.next(res)) ;
+ if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
+ Assert.assertEquals("i=" + i, expectedCount, res.size());
+ long timestamp = res.get(0).getTimestamp();
+ Assert.assertTrue(timestamp >= prevTimestamp);
+ prevTimestamp = timestamp;
+ }
+ }
+
+ putThread.done();
+ putThread.join();
+ putThread.checkNoError();
+
+ flushThread.done();
+ flushThread.join();
+ flushThread.checkNoError();
+ }
+
+ protected class PutThread extends Thread {
+ private volatile boolean done;
+ private Throwable error = null;
+ private int numRows;
+ private byte[][] families;
+ private byte[][] qualifiers;
+
+ private PutThread(int numRows, byte[][] families,
+ byte[][] qualifiers) {
+ this.numRows = numRows;
+ this.families = families;
+ this.qualifiers = qualifiers;
+ }
+
+ public void done() {
+ done = true;
+ synchronized (this) {
+ interrupt();
+ }
+ }
+
+ public void checkNoError() {
+ if (error != null) {
+ Assert.assertNull(error);
+ }
+ }
+
+ @Override
+ public void run() {
+ done = false;
+ int val = 0;
+ while (!done) {
+ try {
+ for (int r = 0; r < numRows; r++) {
+ byte[] row = Bytes.toBytes("row" + r);
+ Put put = new Put(row);
+ for (int f = 0; f < families.length; f++) {
+ for (int q = 0; q < qualifiers.length; q++) {
+ put.add(families[f], qualifiers[q], (long) val,
+ Bytes.toBytes(val));
+ }
+ }
+ region.put(put);
+ if (val > 0 && val % 47 == 0){
+ //System.out.println("put iteration = " + val);
+ Delete delete = new Delete(row, (long)val-30, null);
+ region.delete(delete, null, true);
+ }
+ val++;
+ }
+ } catch (IOException e) {
+ LOG.error("error while putting records", e);
+ error = e;
+ break;
+ }
+ }
+
+ }
+
+ }
+
+
+ /**
+ * Writes very wide records and gets the latest row every time..
+ * Flushes and compacts the region every now and then to keep things
+ * realistic.
+ *
+ * @throws IOException by flush / scan / compaction
+ * @throws InterruptedException when joining threads
+ */
+ public void testWritesWhileGetting()
+ throws IOException, InterruptedException {
+ byte[] tableName = Bytes.toBytes("testWritesWhileScanning");
+ int testCount = 200;
+ int numRows = 1;
+ int numFamilies = 10;
+ int numQualifiers = 100;
+ int flushInterval = 10;
+ int compactInterval = 10 * flushInterval;
+ byte[][] families = new byte[numFamilies][];
+ for (int i = 0; i < numFamilies; i++) {
+ families[i] = Bytes.toBytes("family" + i);
+ }
+ byte[][] qualifiers = new byte[numQualifiers][];
+ for (int i = 0; i < numQualifiers; i++) {
+ qualifiers[i] = Bytes.toBytes("qual" + i);
+ }
+
+ String method = "testWritesWhileScanning";
+ initHRegion(tableName, method, families);
+ PutThread putThread = new PutThread(numRows, families, qualifiers);
+ putThread.start();
+ FlushThread flushThread = new FlushThread();
+ flushThread.start();
+
+ Get get = new Get(Bytes.toBytes("row0"));
+ Result result = null;
+
+ int expectedCount = numFamilies * numQualifiers;
+
+ long prevTimestamp = 0L;
+ for (int i = 0; i < testCount; i++) {
+
+ if (i != 0 && i % compactInterval == 0) {
+ region.compactStores(true);
+ }
+
+ if (i != 0 && i % flushInterval == 0) {
+ //System.out.println("iteration = " + i);
+ flushThread.flush();
+ }
+
+ boolean previousEmpty = result == null || result.isEmpty();
+ result = region.get(get, null);
+ if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
+ Assert.assertEquals("i=" + i, expectedCount, result.size());
+ long timestamp =
+ result.getCellValue(families[0], qualifiers[0]).getTimestamp();
+ Assert.assertTrue(timestamp >= prevTimestamp);
+ prevTimestamp = timestamp;
+ }
+ }
+
+ putThread.done();
+ putThread.join();
+ putThread.checkNoError();
+
+ flushThread.done();
+ flushThread.join();
+ flushThread.checkNoError();
+ }
+
+
+ public void testIndexesScanWithOneDeletedRow() throws IOException {
+ byte[] tableName = Bytes.toBytes("testIndexesScanWithOneDeletedRow");
+ byte[] family = Bytes.toBytes("family");
+
+ //Setting up region
+ String method = "testIndexesScanWithOneDeletedRow";
+ initHRegion(tableName, method, new HBaseConfiguration(), family);
+
+ Put put = new Put(Bytes.toBytes(1L));
+ put.add(family, qual1, 1L, Bytes.toBytes(1L));
+ region.put(put);
+
+ region.flushcache();
+
+ Delete delete = new Delete(Bytes.toBytes(1L), 1L, null);
+ //delete.deleteColumn(family, qual1);
+ region.delete(delete, null, true);
+
+ put = new Put(Bytes.toBytes(2L));
+ put.add(family, qual1, 2L, Bytes.toBytes(2L));
+ region.put(put);
+
+ Scan idxScan = new Scan();
+ idxScan.addFamily(family);
+ idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,
+ Arrays.<Filter>asList(new SingleColumnValueFilter(family, qual1,
+ CompareFilter.CompareOp.GREATER_OR_EQUAL,
+ new BinaryComparator(Bytes.toBytes(0L))),
+ new SingleColumnValueFilter(family, qual1,
+ CompareFilter.CompareOp.LESS_OR_EQUAL,
+ new BinaryComparator(Bytes.toBytes(3L)))
+ )));
+ InternalScanner scanner = region.getScanner(idxScan);
+ List<KeyValue> res = new ArrayList<KeyValue>();
+
+ //long start = System.nanoTime();
+ while (scanner.next(res)) ;
+ //long end = System.nanoTime();
+ //System.out.println("memStoreEmpty=" + memStoreEmpty + ", time=" + (end - start)/1000000D);
+ assertEquals(1L, res.size());
+
+ }
+
+
private void putData(int startRow, int numRows, byte [] qf,
byte [] ...families)
throws IOException {
@@ -1887,16 +2257,22 @@
throws IOException {
initHRegion(tableName, callingMethod, new HBaseConfiguration(), families);
}
-
- private void initHRegion (byte [] tableName, String callingMethod,
- HBaseConfiguration conf, byte [] ... families)
- throws IOException{
+
+ protected void initHRegion(byte[] tableName, String callingMethod,
+ HBaseConfiguration conf, byte[]... families)
+ throws IOException {
+ HTableDescriptor htd = constructTableDescriptor(tableName, families);
+ HRegionInfo info = new HRegionInfo(htd, null, null, false);
+ Path path = new Path(DIR + callingMethod);
+ region = HRegion.createHRegion(info, path, conf);
+ }
+
+ protected HTableDescriptor constructTableDescriptor(byte[] tableName,
+ byte[]... families) {
HTableDescriptor htd = new HTableDescriptor(tableName);
- for(byte [] family : families) {
+ for (byte[] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
- HRegionInfo info = new HRegionInfo(htd, null, null, false);
- Path path = new Path(DIR + callingMethod);
- region = HRegion.createHRegion(info, path, conf);
+ return htd;
}
}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java Sat Jan 9 21:09:59 2010
@@ -217,7 +217,7 @@
private void flush(int storeFilessize) throws IOException{
this.store.snapshot();
- this.store.flushCache(id++);
+ flushStore(store, id++);
assertEquals(storeFilessize, this.store.getStorefiles().size());
assertEquals(0, this.store.memstore.kvset.size());
}
@@ -260,7 +260,7 @@
assertTrue(ret > 0);
// then flush.
- this.store.flushCache(id++);
+ flushStore(store, id++);
assertEquals(1, this.store.getStorefiles().size());
// from the one we inserted up there, and a new one
assertEquals(2, this.store.memstore.kvset.size());
@@ -286,4 +286,11 @@
assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
}
+
+ private static void flushStore(Store store, long id) throws IOException {
+ StoreFlusher storeFlusher = store.getStoreFlusher(id);
+ storeFlusher.prepare();
+ storeFlusher.flushCache();
+ storeFlusher.commit();
+ }
}
\ No newline at end of file
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestBytes.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestBytes.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestBytes.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestBytes.java Sat Jan 9 21:09:59 2010
@@ -21,8 +21,10 @@
import java.io.IOException;
import java.util.Arrays;
+import java.math.BigDecimal;
import junit.framework.TestCase;
+import junit.framework.Assert;
public class TestBytes extends TestCase {
public void testNullHashCode() {
@@ -70,6 +72,27 @@
assertTrue(Bytes.equals(parts[1], middle));
}
+ public void testToChars() throws Exception {
+ char[] chars = new char[]{'b', 'l', 'a', 'b', 'l', 'a', 'b', 'l', 'a'};
+ byte[] bytes = Bytes.toBytes(chars);
+ char[] chars2 = Bytes.toChars(bytes);
+ assertTrue(Arrays.equals(chars, chars2));
+ }
+
+ public void testToBigDecimal() throws Exception {
+ BigDecimal bd1 = new BigDecimal("3.14");
+ byte[] bytes = Bytes.toBytes(bd1);
+ BigDecimal bd2 = Bytes.toBigDecimal(bytes);
+ assertEquals(bd1, bd2);
+ }
+
+ public void testToChar() throws Exception {
+ for (char c = 0; c < Character.MAX_VALUE; c++) {
+ byte[] b = Bytes.toBytes(c);
+ assertEquals(c, Bytes.toChar(b));
+ }
+ }
+
public void testToLong() throws Exception {
long [] longs = {-1l, 123l, 122232323232l};
for (int i = 0; i < longs.length; i++) {
Added: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestPair.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestPair.java?rev=897547&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestPair.java (added)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestPair.java Sat Jan 9 21:09:59 2010
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hbase.HBaseTestCase;
+
+import java.io.*;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * `
+ * Tests for the pair class.
+ */
+public class TestPair extends HBaseTestCase {
+
+ /**
+ * Test of factory method.
+ * Verifies that <code>null</code> args are permitted.
+ */
+ public void testOf() {
+ Pair<Integer, Double> p1 = Pair.of(1, 1.2);
+ Assert.assertEquals(1, (int) p1.getFirst());
+ Assert.assertEquals(1.2, p1.getSecond());
+
+ Pair<String, Integer> p2 = Pair.of("a", 1);
+ Assert.assertEquals("a", p2.getFirst());
+ Assert.assertEquals(1, (int) p2.getSecond());
+
+ p2 = Pair.of(null, (Integer) null);
+ Assert.assertNull(p2.getFirst());
+ Assert.assertNull(p2.getSecond());
+ }
+
+
+ /**
+ * Test equals() method works as expected.
+ */
+ public void testEquals() {
+ // Set up test data
+ Pair<?, ?> item1a = Pair.of(1, 1);
+ Pair<?, ?> item1b = Pair.of(1, 1);
+ Pair<?, ?> item2 = Pair.of(1, 2);
+ Pair<?, ?> item3 = Pair.of(2, 1);
+
+ // One should be equal
+ Assert.assertTrue(item1a.equals(item1b));
+
+ // Others should not be
+ Assert.assertFalse(item1a.equals(item2));
+ Assert.assertFalse(item1a.equals(item3));
+
+ // check array equals
+ Pair<?,?> item4 = Pair.of(new byte[]{3,27}, new String[]{"foo","bar"});
+ Pair<?,?> item5 = Pair.of(new byte[]{3,27}, new String[]{"foo","bar"});
+ Assert.assertTrue(item4.equals(item5));
+
+ }
+
+ /**
+ * Test equals() method works as expected.
+ */
+ public void testEqualsNullFirst() {
+ // Set up test data
+ Pair<?, ?> item1 = Pair.of(1, 1);
+ Pair<?, ?> item2 = Pair.of(null, 2);
+
+ Assert.assertFalse(item1.equals(item2));
+ Assert.assertFalse(item2.equals(item1));
+ }
+
+ /**
+ * Test equals() method works as expected.
+ */
+ public void testEqualsNullSecond() {
+ // Set up test data
+ Pair<?, ?> item1 = Pair.of(1, 1);
+ Pair<?, ?> item2 = Pair.of(1, null);
+
+ Assert.assertFalse(item1.equals(item2));
+ Assert.assertFalse(item2.equals(item1));
+ }
+
+ /**
+ * Test equals() method works as expected.
+ */
+ public void testEqualsNullBoth() {
+ // Set up test data
+ Pair<?, ?> item1 = Pair.of(null, null);
+ Pair<?, ?> item2 = Pair.of(null, null);
+
+ Assert.assertTrue(item1.equals(item2));
+ Assert.assertTrue(item2.equals(item1));
+ }
+
+ /**
+ * Test hashCode() method works correctly.
+ */
+ public void testHashCode() {
+ // Set up test data
+ Set<Pair<?, ?>> testSet = new HashSet<Pair<?, ?>>();
+ Pair<?, ?> item1a = Pair.of(1, 1);
+ Pair<?, ?> item1b = Pair.of(1, 1);
+ Pair<?, ?> item2 = Pair.of(1, 2);
+ Pair<?, ?> item3 = Pair.of(2, 1);
+
+ // Item one should be found since they share the same content.
+ testSet.add(item1a);
+ Assert.assertTrue(testSet.contains(item1b));
+
+ // Others should not be found
+ Assert.assertFalse(testSet.contains(item2));
+ Assert.assertFalse(testSet.contains(item3));
+ }
+
+ /**
+ * Test hashCode() method works correctly with primitive arrays.
+ */
+ public void testHashCodeWithPrimitiveArray() {
+ // Set up test data
+ Pair<?, ?> pair1 = Pair.of(new byte[] {1, 2, 3}, new byte[] {4, 5, 6});
+ Pair<?, ?> pair2 = Pair.of(new byte[] {1, 2, 3}, new byte[] {4, 5, 6});
+
+ Assert.assertEquals(pair1.hashCode(), pair2.hashCode());
+ Assert.assertFalse(pair1.hashCode() == Pair.of(1, 2).hashCode());
+ }
+
+ /**
+ * Test hashCode() method works correctly with object arrays.
+ */
+ public void testHashCodeWithObjectArray() {
+ // Set up test data
+ Pair<?, ?> pair1 = Pair.of(new Object[] {"one", "two", "three"}, new Object[] {"four", "five", "six"});
+ Pair<?, ?> pair2 = Pair.of(new Object[] {"one", "two", "three"}, new Object[] {"four", "five", "six"});
+
+ Assert.assertEquals(pair1.hashCode(), pair2.hashCode());
+ Assert.assertFalse(pair1.hashCode() == Pair.of(1, 2).hashCode());
+ }
+
+ /**
+ * Tests the the pair class serializes and deserializes without a problem.
+ *
+ * @throws IOException exception - should not occur
+ * @throws ClassNotFoundException exception - should not occur
+ */
+ public void testSerializable() throws IOException, ClassNotFoundException {
+ Pair<?, ?> item1 = Pair.of(5, "bla");
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bout);
+ out.writeObject(item1);
+ ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bout.toByteArray()));
+ Assert.assertEquals(item1, in.readObject());
+ }
+
+}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/master.jsp
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/master.jsp?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/master.jsp (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/master.jsp Sat Jan 9 21:09:59 2010
@@ -3,6 +3,7 @@
import="java.net.URLEncoder"
import="org.apache.hadoop.io.Text"
import="org.apache.hadoop.hbase.util.Bytes"
+ import="org.apache.hadoop.hbase.util.FSUtils"
import="org.apache.hadoop.hbase.master.HMaster"
import="org.apache.hadoop.hbase.HConstants"
import="org.apache.hadoop.hbase.master.MetaRegion"
@@ -23,6 +24,7 @@
if (interval == 0) {
interval = 1;
}
+ Map<String, Integer> frags = master.getTableFragmentation();
%><?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
@@ -48,6 +50,7 @@
<tr><td>HBase Root Directory</td><td><%= master.getRootDir().toString() %></td><td>Location of HBase home directory</td></tr>
<tr><td>Load average</td><td><%= master.getAverageLoad() %></td><td>Average number of regions per regionserver. Naive computation.</td></tr>
<tr><td>Regions On FS</td><td><%= master.countRegionsOnFS() %></td><td>Number of regions on FileSystem. Rough count.</td></tr>
+<tr><td>Fragmentation</td><td><%= frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %></td><td>Overall fragmentation of all tables, including .META. and -ROOT-.</td></tr>
<tr><td>Zookeeper Quorum</td><td><%= master.getZooKeeperWrapper().getQuorumServers() %></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk dump</a>.</td></tr>
</table>
@@ -55,11 +58,17 @@
<%
if (rootLocation != null) { %>
<table>
-<tr><th>Table</th><th>Description</th></tr>
-<tr><td><a href="/table.jsp?name=<%= Bytes.toString(HConstants.ROOT_TABLE_NAME) %>"><%= Bytes.toString(HConstants.ROOT_TABLE_NAME) %></a></td><td>The -ROOT- table holds references to all .META. regions.</td></tr>
+<tr><th>Table</th><th title="Fragmentation - Will be 0% after a major compaction and fluctuate during normal usage.">Frag.</th><th>Description</th></tr>
+<tr><td><a href="/table.jsp?name=<%= Bytes.toString(HConstants.ROOT_TABLE_NAME) %>"><%= Bytes.toString(HConstants.ROOT_TABLE_NAME) %></a></td>
+<td align="center"><%= frags.get("-ROOT-") != null ? frags.get("-ROOT-").intValue() + "%" : "n/a" %></td>
+<td>The -ROOT- table holds references to all .META. regions.</td>
+</tr>
<%
if (onlineRegions != null && onlineRegions.size() > 0) { %>
-<tr><td><a href="/table.jsp?name=<%= Bytes.toString(HConstants.META_TABLE_NAME) %>"><%= Bytes.toString(HConstants.META_TABLE_NAME) %></a></td><td>The .META. table holds references to all User Table regions</td></tr>
+<tr><td><a href="/table.jsp?name=<%= Bytes.toString(HConstants.META_TABLE_NAME) %>"><%= Bytes.toString(HConstants.META_TABLE_NAME) %></a></td>
+<td align="center"><%= frags.get(".META.") != null ? frags.get(".META.").intValue() + "%" : "n/a" %></td>
+<td>The .META. table holds references to all User Table regions</td>
+</tr>
<% } %>
</table>
@@ -69,9 +78,12 @@
<% HTableDescriptor[] tables = new HBaseAdmin(conf).listTables();
if(tables != null && tables.length > 0) { %>
<table>
-<tr><th>Table</th><th>Description</th></tr>
+<tr><th>Table</th><th title="Fragmentation - Will be 0% after a major compaction and fluctuate during normal usage.">Frag.</th><th>Description</th></tr>
<% for(HTableDescriptor htDesc : tables ) { %>
-<tr><td><a href=/table.jsp?name=<%= htDesc.getNameAsString() %>><%= htDesc.getNameAsString() %></a> </td><td><%= htDesc.toString() %></td></tr>
+<tr><td><a href=/table.jsp?name=<%= htDesc.getNameAsString() %>><%= htDesc.getNameAsString() %></a> </td>
+<td align="center"><%= frags.get(htDesc.getNameAsString()) != null ? frags.get(htDesc.getNameAsString()).intValue() + "%" : "n/a" %></td>
+<td><%= htDesc.toString() %></td>
+</tr>
<% } %>
<p> <%= tables.length %> table(s) in set.</p>
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/table.jsp
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/table.jsp?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/table.jsp (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/table.jsp Sat Jan 9 21:09:59 2010
@@ -27,6 +27,7 @@
master.getServerAddressToServerInfo();
String tableHeader = "<h2>Table Regions</h2><table><tr><th>Name</th><th>Region Server</th><th>Encoded Name</th><th>Start Key</th><th>End Key</th></tr>";
HServerAddress rootLocation = master.getRootRegionLocation();
+ Map<String, Integer> frags = master.getTableFragmentation();
%>
<?xml version="1.0" encoding="UTF-8" ?>
@@ -120,6 +121,7 @@
<table>
<tr><th>Attribute Name</th><th>Value</th><th>Description</th></tr>
<tr><td>Enabled</td><td><%= hbadmin.isTableEnabled(table.getTableName()) %></td><td>Is the table enabled</td></tr>
+ <tr><td>Fragmentation</td><td><%= frags.get(tableName) != null ? frags.get(tableName).intValue() + "%" : "n/a" %></td><td>How fragmented is the table. After a major compaction it is 0%.</td></tr>
</table>
<%
Map<HRegionInfo, HServerAddress> regions = table.getRegionsInfo();