You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 07:01:07 UTC
svn commit: r990018 [4/10] - in /hbase/branches/0.90_master_rewrite: ./ bin/
bin/replication/ src/assembly/ src/docbkx/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/filter/ s...
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Fri Aug 27 05:01:02 2010
@@ -20,9 +20,12 @@
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.NavigableSet;
@@ -30,13 +33,36 @@ import java.util.NavigableSet;
/**
* A query matcher that is specifically designed for the scan case.
*/
-public class ScanQueryMatcher extends QueryMatcher {
+public class ScanQueryMatcher {
// Optimization so we can skip lots of compares when we decide to skip
// to the next row.
private boolean stickyNextRow;
+ private byte[] stopRow;
+
+ protected TimeRange tr;
+
+ protected Filter filter;
+
+ /** Keeps track of deletes */
+ protected DeleteTracker deletes;
+
+ /** Keeps track of columns and versions */
+ protected ColumnTracker columns;
+
+ /** Key to seek to in memstore and StoreFiles */
+ protected KeyValue startKey;
+
+ /** Oldest allowed version stamp for TTL enforcement */
+ protected long oldestStamp;
+
+ /** Row comparator for the region this query is for */
+ KeyValue.KeyComparator rowComparator;
+
+ /** Row the query is on */
+ protected byte [] row;
/**
- * Constructs a QueryMatcher for a Scan.
+ * Constructs a ScanQueryMatcher for a Scan.
* @param scan
* @param family
* @param columns
@@ -50,6 +76,7 @@ public class ScanQueryMatcher extends Qu
this.oldestStamp = System.currentTimeMillis() - ttl;
this.rowComparator = rowComparator;
this.deletes = new ScanDeleteTracker();
+ this.stopRow = scan.getStopRow();
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
this.filter = scan.getFilter();
@@ -98,7 +125,7 @@ public class ScanQueryMatcher extends Qu
// could optimize this, if necessary?
// Could also be called SEEK_TO_CURRENT_ROW, but this
// should be rare/never happens.
- return MatchCode.SKIP;
+ return MatchCode.SEEK_NEXT_ROW;
}
// optimize case.
@@ -123,7 +150,7 @@ public class ScanQueryMatcher extends Qu
long timestamp = kv.getTimestamp();
if (isExpired(timestamp)) {
// done, the rest of this column will also be expired as well.
- return MatchCode.SEEK_NEXT_COL;
+ return getNextRowOrNextColumn(bytes, offset, qualLength);
}
byte type = kv.getType();
@@ -140,17 +167,39 @@ public class ScanQueryMatcher extends Qu
return MatchCode.SKIP;
}
- if (!tr.withinTimeRange(timestamp)) {
+ if (!this.deletes.isEmpty() &&
+ deletes.isDeleted(bytes, offset, qualLength, timestamp)) {
return MatchCode.SKIP;
}
- if (!this.deletes.isEmpty() &&
- deletes.isDeleted(bytes, offset, qualLength, timestamp)) {
+ int timestampComparison = tr.compare(timestamp);
+ if (timestampComparison >= 1) {
return MatchCode.SKIP;
+ } else if (timestampComparison <= -1) {
+ return getNextRowOrNextColumn(bytes, offset, qualLength);
}
- MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
+ /**
+ * Filters should be checked before checking column trackers. If we do
+ * otherwise, as was previously being done, ColumnTracker may increment its
+ * counter for even that KV which may be discarded later on by Filter. This
+ * would lead to incorrect results in certain cases.
+ */
+ if (filter != null) {
+ ReturnCode filterResponse = filter.filterKeyValue(kv);
+ if (filterResponse == ReturnCode.SKIP) {
+ return MatchCode.SKIP;
+ } else if (filterResponse == ReturnCode.NEXT_COL) {
+ return getNextRowOrNextColumn(bytes, offset, qualLength);
+ } else if (filterResponse == ReturnCode.NEXT_ROW) {
+ stickyNextRow = true;
+ return MatchCode.SEEK_NEXT_ROW;
+ } else if (filterResponse == ReturnCode.SEEK_NEXT_USING_HINT) {
+ return MatchCode.SEEK_NEXT_USING_HINT;
+ }
+ }
+ MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
// if SKIP -> SEEK_NEXT_COL
// if (NEXT,DONE) -> SEEK_NEXT_ROW
// if (INCLUDE) -> INCLUDE
@@ -161,37 +210,127 @@ public class ScanQueryMatcher extends Qu
return MatchCode.SEEK_NEXT_ROW;
}
- // else INCLUDE
- // if (colChecker == MatchCode.INCLUDE)
- // give the filter a chance to run.
- if (filter == null)
- return MatchCode.INCLUDE;
-
- ReturnCode filterResponse = filter.filterKeyValue(kv);
- if (filterResponse == ReturnCode.INCLUDE)
- return MatchCode.INCLUDE;
+ return MatchCode.INCLUDE;
- if (filterResponse == ReturnCode.SKIP)
- return MatchCode.SKIP;
+ }
- // else if (filterResponse == ReturnCode.NEXT_ROW)
- stickyNextRow = true;
- return MatchCode.SEEK_NEXT_ROW;
+ public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
+ int qualLength) {
+ if (columns instanceof ExplicitColumnTracker) {
+ //We only come here when we know that columns is an instance of
+ //ExplicitColumnTracker so we should never have a cast exception
+ ((ExplicitColumnTracker)columns).doneWithColumn(bytes, offset,
+ qualLength);
+ if (columns.getColumnHint() == null) {
+ return MatchCode.SEEK_NEXT_ROW;
+ } else {
+ return MatchCode.SEEK_NEXT_COL;
+ }
+ } else {
+ return MatchCode.SEEK_NEXT_COL;
+ }
+ }
+
+ public boolean moreRowsMayExistAfter(KeyValue kv) {
+ if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
+ rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
+ kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) {
+ return false;
+ } else {
+ return true;
+ }
}
/**
* Set current row
* @param row
*/
- @Override
public void setRow(byte [] row) {
this.row = row;
reset();
}
- @Override
public void reset() {
- super.reset();
+ this.deletes.reset();
+ this.columns.reset();
+
stickyNextRow = false;
}
+
+ // should be in KeyValue.
+ protected boolean isDelete(byte type) {
+ return (type != KeyValue.Type.Put.getCode());
+ }
+
+ protected boolean isExpired(long timestamp) {
+ return (timestamp < oldestStamp);
+ }
+
+ /**
+ *
+ * @return the start key
+ */
+ public KeyValue getStartKey() {
+ return this.startKey;
+ }
+
+ public KeyValue getNextKeyHint(KeyValue kv) {
+ if (filter == null) {
+ return null;
+ } else {
+ return filter.getNextKeyHint(kv);
+ }
+ }
+
+ /**
+ * {@link #match} return codes. These instruct the scanner moving through
+ * memstores and StoreFiles what to do with the current KeyValue.
+ * <p>
+ * Additionally, this contains "early-out" language to tell the scanner to
+ * move on to the next File (memstore or Storefile), or to return immediately.
+ */
+ public static enum MatchCode {
+ /**
+ * Include KeyValue in the returned result
+ */
+ INCLUDE,
+
+ /**
+ * Do not include KeyValue in the returned result
+ */
+ SKIP,
+
+ /**
+ * Do not include, jump to next StoreFile or memstore (in time order)
+ */
+ NEXT,
+
+ /**
+ * Do not include, return current result
+ */
+ DONE,
+
+ /**
+ * These codes are used by the ScanQueryMatcher
+ */
+
+ /**
+ * Done with the row, seek there.
+ */
+ SEEK_NEXT_ROW,
+ /**
+ * Done with column, seek to next.
+ */
+ SEEK_NEXT_COL,
+
+ /**
+ * Done with scan, thanks to the row filter.
+ */
+ DONE_SCAN,
+
+ /*
+ * Seek to next key which is given as hint.
+ */
+ SEEK_NEXT_USING_HINT,
+ }
}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java Fri Aug 27 05:01:02 2010
@@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.regionse
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -65,15 +65,15 @@ public class ScanWildcardColumnTracker i
currentCount = 0;
if (++currentCount > maxVersions)
- return MatchCode.SKIP;
- return MatchCode.INCLUDE;
+ return ScanQueryMatcher.MatchCode.SKIP;
+ return ScanQueryMatcher.MatchCode.INCLUDE;
}
int cmp = Bytes.compareTo(bytes, offset, length,
columnBuffer, columnOffset, columnLength);
if (cmp == 0) {
if (++currentCount > maxVersions)
- return MatchCode.SKIP; // skip to next col
- return MatchCode.INCLUDE;
+ return ScanQueryMatcher.MatchCode.SKIP; // skip to next col
+ return ScanQueryMatcher.MatchCode.INCLUDE;
}
// new col > old col
@@ -84,8 +84,8 @@ public class ScanWildcardColumnTracker i
columnLength = length;
currentCount = 0;
if (++currentCount > maxVersions)
- return MatchCode.SKIP;
- return MatchCode.INCLUDE;
+ return ScanQueryMatcher.MatchCode.SKIP;
+ return ScanQueryMatcher.MatchCode.INCLUDE;
}
// new col < oldcol
@@ -102,8 +102,8 @@ public class ScanWildcardColumnTracker i
columnLength = length;
currentCount = 0;
if (++currentCount > maxVersions)
- return MatchCode.SKIP;
- return MatchCode.INCLUDE;
+ return ScanQueryMatcher.MatchCode.SKIP;
+ return ScanQueryMatcher.MatchCode.INCLUDE;
}
@Override
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,520 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+
+/**
+ * Executes region split as a "transaction". Call {@link #prepare()} to setup
+ * the transaction, {@link #execute(OnlineRegions)} to run the transaction and
+ * {@link #rollback(OnlineRegions)} to cleanup if execute fails.
+ *
+ * <p>Here is an example of how you would use this class:
+ * <pre>
+ * SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
+ * if (!st.prepare()) return;
+ * try {
+ * st.execute(myOnlineRegions);
+ * } catch (IOException ioe) {
+ * try {
+ * st.rollback(myOnlineRegions);
+ * return;
+ * } catch (RuntimeException e) {
+ * myAbortable.abort("Failed split, abort");
+ * }
+ * }
+ * </Pre>
+ */
+class SplitTransaction {
+ private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
+ private static final String SPLITDIR = "splits";
+
+ /*
+ * Region to split
+ */
+ private final HRegion parent;
+ private HRegionInfo hri_a;
+ private HRegionInfo hri_b;
+ private Path splitdir;
+
+ /*
+ * Row to split around
+ */
+ private final byte [] splitrow;
+
+ /**
+ * Types to add to the transaction journal
+ */
+ enum JournalEntry {
+ /**
+ * We created the temporary split data directory.
+ */
+ CREATE_SPLIT_DIR,
+ /**
+ * Closed the parent region.
+ */
+ CLOSED_PARENT_REGION,
+ /**
+ * The parent has been taken out of the server's online regions list.
+ */
+ OFFLINED_PARENT,
+ /**
+ * Started in on creation of the first daughter region.
+ */
+ STARTED_REGION_A_CREATION,
+ /**
+ * Started in on the creation of the second daughter region.
+ */
+ STARTED_REGION_B_CREATION
+ }
+
+ /*
+ * Journal of how far the split transaction has progressed.
+ */
+ private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
+
+ /**
+ * Constructor
+ * @param c Configuration to use running split
+ * @param r Region to split
+ * @param splitrow Row to split around
+ */
+ SplitTransaction(final HRegion r, final byte [] splitrow) {
+ this.parent = r;
+ this.splitrow = splitrow;
+ this.splitdir = getSplitDir(this.parent);
+ }
+
+ /**
+ * Does checks on split inputs.
+ * @return <code>true</code> if the region is splittable else
+ * <code>false</code> if it is not (e.g. its already closed, etc.). If we
+ * return <code>true</code>, we'll have taken out the parent's
+ * <code>splitsAndClosesLock</code> and only way to unlock is successful
+ * {@link #execute(OnlineRegions)} or {@link #rollback(OnlineRegions)}
+ */
+ public boolean prepare() {
+ boolean prepared = false;
+ this.parent.lock.writeLock().lock();
+ try {
+ if (this.parent.isClosed() || this.parent.isClosing()) return prepared;
+ HRegionInfo hri = this.parent.getRegionInfo();
+ // Check splitrow.
+ byte [] startKey = hri.getStartKey();
+ byte [] endKey = hri.getEndKey();
+ if (Bytes.equals(startKey, splitrow) ||
+ !this.parent.getRegionInfo().containsRow(splitrow)) {
+ LOG.info("Split row is not inside region key range or is equal to " +
+ "startkey: " + Bytes.toString(this.splitrow));
+ return prepared;
+ }
+ long rid = getDaughterRegionIdTimestamp(hri);
+ this.hri_a = new HRegionInfo(hri.getTableDesc(), startKey, this.splitrow,
+ false, rid);
+ this.hri_b = new HRegionInfo(hri.getTableDesc(), this.splitrow, endKey,
+ false, rid);
+ prepared = true;
+ } finally {
+ if (!prepared) this.parent.lock.writeLock().unlock();
+ }
+ return prepared;
+ }
+
+ /**
+ * Calculate daughter regionid to use.
+ * @param hri Parent {@link HRegionInfo}
+ * @return Daughter region id (timestamp) to use.
+ */
+ private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
+ long rid = EnvironmentEdgeManager.currentTimeMillis();
+ // Regionid is timestamp. Can't be less than that of parent else will insert
+ // at wrong location in .META. (See HBASE-710).
+ if (rid < hri.getRegionId()) {
+ LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
+ " but current time here is " + rid);
+ rid = hri.getRegionId() + 1;
+ }
+ return rid;
+ }
+
+ /**
+ * Run the transaction.
+ * @param or Object that can online/offline parent region. Can be null
+ * @param ct CatalogTracker instance.
+ * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
+ * @return Regions created
+ * @see #rollback(OnlineRegions)
+ */
+ public PairOfSameType<HRegion> execute(final OnlineRegions or,
+ final CatalogTracker ct)
+ throws IOException {
+ return execute(or, ct, or != null);
+ }
+
+ /**
+ * Run the transaction.
+ * @param or Object that can online/offline parent region. Can be null (Tests
+ * will pass null).
+ * @param ct CatalogTracker instance. Can be null (for testing)
+ * @param updateMeta If <code>true</code>, update meta (set to false when testing).
+ * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
+ * @return Regions created
+ * @see #rollback(OnlineRegions)
+ */
+ PairOfSameType<HRegion> execute(final OnlineRegions or, final CatalogTracker ct,
+ final boolean updateMeta)
+ throws IOException {
+ LOG.info("Starting split of region " + this.parent);
+ if (!this.parent.lock.writeLock().isHeldByCurrentThread()) {
+ throw new SplitAndCloseWriteLockNotHeld();
+ }
+
+ // We'll need one of these later but get it now because if we fail there
+ // is nothing to undo.
+ HTable t = null;
+ if (updateMeta) t = getTable(this.parent.getConf());
+
+ createSplitDir(this.parent.getFilesystem(), this.splitdir);
+ this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
+
+ List<StoreFile> hstoreFilesToSplit = this.parent.close(false);
+ this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
+
+ if (or != null) or.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
+ this.journal.add(JournalEntry.OFFLINED_PARENT);
+
+ splitStoreFiles(this.splitdir, hstoreFilesToSplit);
+ // splitStoreFiles creates daughter region dirs under the parent splits dir
+ // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
+ // clean this up.
+
+ // Log to the journal that we are creating region A, the first daughter
+ // region. We could fail halfway through. If we do, we could have left
+ // stuff in fs that needs cleanup -- a storefile or two. Thats why we
+ // add entry to journal BEFORE rather than AFTER the change.
+ this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
+ HRegion a = createDaughterRegion(this.hri_a);
+
+ // Ditto
+ this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
+ HRegion b = createDaughterRegion(this.hri_b);
+
+ // Edit parent in meta
+ if (ct != null) {
+ MetaEditor.offlineParentInMeta(ct, this.parent.getRegionInfo(),
+ a.getRegionInfo(), b.getRegionInfo());
+ }
+
+ // The is the point of no return. We are committed to the split now. Up to
+ // a failure editing parent in meta or a crash of the hosting regionserver,
+ // we could rollback (or, if crash, we could cleanup on redeploy) but now
+ // meta has been changed, we can only go forward. If the below last steps
+ // do not complete, repair has to be done by another agent. For example,
+ // basescanner, at least up till master rewrite, would add daughter rows if
+ // missing from meta. It could do this because the parent edit includes the
+ // daughter specs. In Bigtable paper, they have another mechanism where
+ // some feedback to the master somehow flags it that split is incomplete and
+ // needs fixup. Whatever the mechanism, its a TODO that we have some fixup.
+
+ // I looked at writing the put of the parent edit above out to the WAL log
+ // before changing meta with the notion that should we fail, then on replay
+ // the offlining of the parent and addition of daughters up into meta could
+ // be reinserted. The edits would have to be 'special' and given how our
+ // splits work, splitting by region, I think the replay would have to happen
+ // inside in the split code -- as soon as it saw one of these special edits,
+ // rather than write the edit out a file for the .META. region to replay or
+ // somehow, write it out to this regions edits file for it to handle on
+ // redeploy -- this'd be whacky, we'd be telling meta about a split during
+ // the deploy of the parent -- instead we'd have to play the edit inside
+ // in the split code somehow; this would involve a stop-the-splitting till
+ // meta had been edited which might hold up splitting a good while.
+
+ // Finish up the meta edits. If these fail, another agent needs to do fixup
+ HRegionInfo hri = a.getRegionInfo();
+ try {
+ if (ct != null) MetaEditor.addRegionToMeta(ct, hri);
+ hri = b.getRegionInfo();
+ if (ct != null) MetaEditor.addRegionToMeta(ct, hri);
+ } catch (IOException e) {
+ // Don't let this out or we'll run rollback.
+ LOG.warn("Failed adding daughter " + hri.toString());
+ }
+ // This should not fail because the HTable instance we are using is not
+ // running a buffer -- its immediately flushing its puts.
+ if (t != null) t.close();
+
+ // Unlock if successful split.
+ this.parent.lock.writeLock().unlock();
+
+ // Leaving here, the splitdir with its dross will be in place but since the
+ // split was successful, just leave it; it'll be cleaned when parent is
+ // deleted and cleaned up.
+ return new PairOfSameType<HRegion>(a, b);
+ }
+
+ private static Path getSplitDir(final HRegion r) {
+ return new Path(r.getRegionDir(), SPLITDIR);
+ }
+
+ /**
+ * @param fs Filesystem to use
+ * @param splitdir Directory to store temporary split data in
+ * @throws IOException If <code>splitdir</code> already exists or we fail
+ * to create it.
+ * @see #cleanupSplitDir(FileSystem, Path)
+ */
+ private static void createSplitDir(final FileSystem fs, final Path splitdir)
+ throws IOException {
+ if (fs.exists(splitdir)) throw new IOException("Splitdir already exits? " + splitdir);
+ if (!fs.mkdirs(splitdir)) throw new IOException("Failed create of " + splitdir);
+ }
+
+ private static void cleanupSplitDir(final FileSystem fs, final Path splitdir)
+ throws IOException {
+ // Splitdir may have been cleaned up by reopen of the parent dir.
+ deleteDir(fs, splitdir, false);
+ }
+
+ /**
+ * @param fs Filesystem to use
+ * @param dir Directory to delete
+ * @param mustPreExist If true, we'll throw exception if <code>dir</code>
+ * does not preexist, else we'll just pass.
+ * @throws IOException Thrown if we fail to delete passed <code>dir</code>
+ */
+ private static void deleteDir(final FileSystem fs, final Path dir,
+ final boolean mustPreExist)
+ throws IOException {
+ if (!fs.exists(dir)) {
+ if (mustPreExist) throw new IOException(dir.toString() + " does not exist!");
+ } else if (!fs.delete(dir, true)) {
+ throw new IOException("Failed delete of " + dir);
+ }
+ }
+
+ private void splitStoreFiles(final Path splitdir,
+ final List<StoreFile> hstoreFilesToSplit)
+ throws IOException {
+ if (hstoreFilesToSplit == null) {
+ // Could be null because close didn't succeed -- for now consider it fatal
+ throw new IOException("Close returned empty list of StoreFiles");
+ }
+
+ // Split each store file.
+ for (StoreFile sf: hstoreFilesToSplit) {
+ splitStoreFile(sf, splitdir);
+ }
+ }
+
+ private void splitStoreFile(final StoreFile sf, final Path splitdir)
+ throws IOException {
+ FileSystem fs = this.parent.getFilesystem();
+ byte [] family = sf.getFamily();
+ String encoded = this.hri_a.getEncodedName();
+ Path storedir = Store.getStoreHomedir(splitdir, encoded, family);
+ StoreFile.split(fs, storedir, sf, this.splitrow, Range.bottom);
+ encoded = this.hri_b.getEncodedName();
+ storedir = Store.getStoreHomedir(splitdir, encoded, family);
+ StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);
+ }
+
+ /**
+ * @param hri
+ * @return Created daughter HRegion.
+ * @throws IOException
+ * @see #cleanupDaughterRegion(FileSystem, Path, HRegionInfo)
+ */
+ HRegion createDaughterRegion(final HRegionInfo hri)
+ throws IOException {
+ // Package private so unit tests have access.
+ FileSystem fs = this.parent.getFilesystem();
+ Path regionDir = getSplitDirForDaughter(this.parent.getFilesystem(),
+ this.splitdir, hri);
+ HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
+ this.parent.getLog(), fs, this.parent.getConf(),
+ hri, null);
+ HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
+ return r;
+ }
+
+ private static void cleanupDaughterRegion(final FileSystem fs,
+ final Path tabledir, final String encodedName)
+ throws IOException {
+ Path regiondir = HRegion.getRegionDir(tabledir, encodedName);
+ // Dir may not preexist.
+ deleteDir(fs, regiondir, false);
+ }
+
+ /*
+ * Get the daughter directories in the splits dir. The splits dir is under
+ * the parent regions' directory.
+ * @param fs
+ * @param splitdir
+ * @param hri
+ * @return Path to daughter split dir.
+ * @throws IOException
+ */
+ private static Path getSplitDirForDaughter(final FileSystem fs,
+ final Path splitdir, final HRegionInfo hri)
+ throws IOException {
+ return new Path(splitdir, hri.getEncodedName());
+ }
+
+ /*
+ * @param r Parent region we want to edit.
+ * @return An HTable instance against the meta table that holds passed
+ * <code>r</code>; it has autoFlush enabled so we immediately send puts (No
+ * buffering enabled).
+ * @throws IOException
+ */
+ private HTable getTable(final Configuration conf) throws IOException {
+ // When a region is split, the META table needs to updated if we're
+ // splitting a 'normal' region, and the ROOT table needs to be
+ // updated if we are splitting a META region.
+ HTable t = null;
+ if (this.parent.getRegionInfo().isMetaTable()) {
+ t = new HTable(conf, HConstants.ROOT_TABLE_NAME);
+ } else {
+ t = new HTable(conf, HConstants.META_TABLE_NAME);
+ }
+ // Flush puts as we send them -- no buffering.
+ t.setAutoFlush(true);
+ return t;
+ }
+
+ /**
+ * @param or Object that can online/offline parent region. Can be passed null
+ * by unit tests.
+ * @return The region we were splitting
+ * @throws IOException If thrown, rollback failed. Take drastic action.
+ */
+ public void rollback(final OnlineRegions or) throws IOException {
+ if (!this.parent.lock.writeLock().isHeldByCurrentThread()) {
+ throw new SplitAndCloseWriteLockNotHeld();
+ }
+ FileSystem fs = this.parent.getFilesystem();
+ ListIterator<JournalEntry> iterator =
+ this.journal.listIterator(this.journal.size());
+ while (iterator.hasPrevious()) {
+ JournalEntry je = iterator.previous();
+ switch(je) {
+ case CREATE_SPLIT_DIR:
+ cleanupSplitDir(fs, this.splitdir);
+ break;
+
+ case CLOSED_PARENT_REGION:
+ // So, this returns a seqid but if we just closed and then reopened, we
+ // should be ok. On close, we flushed using sequenceid obtained from
+ // hosting regionserver so no need to propagate the sequenceid returned
+ // out of initialize below up into regionserver as we normally do.
+ // TODO: Verify.
+ this.parent.initialize();
+ break;
+
+ case STARTED_REGION_A_CREATION:
+ cleanupDaughterRegion(fs, this.parent.getTableDir(),
+ this.hri_a.getEncodedName());
+ break;
+
+ case STARTED_REGION_B_CREATION:
+ cleanupDaughterRegion(fs, this.parent.getTableDir(),
+ this.hri_b.getEncodedName());
+ break;
+
+ case OFFLINED_PARENT:
+ if (or != null) or.addToOnlineRegions(this.parent);
+ break;
+
+ default:
+ throw new RuntimeException("Unhandled journal entry: " + je);
+ }
+ }
+ if (this.parent.lock.writeLock().isHeldByCurrentThread()) {
+ this.parent.lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Thrown if lock not held.
+ */
+ @SuppressWarnings("serial")
+ public class SplitAndCloseWriteLockNotHeld extends IOException {}
+
+ HRegionInfo getFirstDaughter() {
+ return hri_a;
+ }
+
+ HRegionInfo getSecondDaughter() {
+ return hri_b;
+ }
+
+ // For unit testing.
+ Path getSplitDir() {
+ return this.splitdir;
+ }
+
+ /**
+ * Clean up any split detritus that may have been left around from previous
+ * split attempts.
+ * Call this method on initial region deploy. Cleans up any mess
+ * left by previous deploys of passed <code>r</code> region.
+ * @param r
+ * @throws IOException
+ */
+ static void cleanupAnySplitDetritus(final HRegion r) throws IOException {
+ Path splitdir = getSplitDir(r);
+ FileSystem fs = r.getFilesystem();
+ if (!fs.exists(splitdir)) return;
+ // Look at the splitdir. It could have the encoded names of the daughter
+ // regions we tried to make. See if the daughter regions actually got made
+ // out under the tabledir. If here under splitdir still, then the split did
+ // not complete. Try and do cleanup. This code WILL NOT catch the case
+ // where we successfully created daughter a but regionserver crashed during
+ // the creation of region b. In this case, there'll be an orphan daughter
+ // dir in the filesystem. TOOD: Fix.
+ FileStatus [] daughters = fs.listStatus(splitdir, new FSUtils.DirFilter(fs));
+ for (int i = 0; i < daughters.length; i++) {
+ cleanupDaughterRegion(fs, r.getTableDir(),
+ daughters[i].getPath().getName());
+ }
+ cleanupSplitDir(r.getFilesystem(), splitdir);
+ LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
+ }
+}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Aug 27 05:01:02 2010
@@ -25,9 +25,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
-import java.util.Set;
import java.util.SortedSet;
-import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -42,9 +40,7 @@ import org.apache.hadoop.hbase.HColumnDe
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.Compression;
@@ -52,6 +48,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
import com.google.common.collect.ImmutableList;
@@ -190,7 +187,7 @@ public class Store implements HeapSize {
this.storefiles = ImmutableList.copyOf(loadStoreFiles());
}
- HColumnDescriptor getFamily() {
+ public HColumnDescriptor getFamily() {
return this.family;
}
@@ -212,7 +209,7 @@ public class Store implements HeapSize {
return new Path(tabledir, new Path(encodedName,
new Path(Bytes.toString(family))));
}
-
+
/**
* Return the directory in which this store stores its
* StoreFiles
@@ -417,15 +414,17 @@ public class Store implements HeapSize {
* Write out current snapshot. Presumes {@link #snapshot()} has been called
* previously.
* @param logCacheFlushId flush sequence number
+ * @param snapshot
* @return true if a compaction is needed
* @throws IOException
*/
private StoreFile flushCache(final long logCacheFlushId,
- SortedSet<KeyValue> snapshot) throws IOException {
+ SortedSet<KeyValue> snapshot,
+ TimeRangeTracker snapshotTimeRangeTracker) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
- return internalFlushCache(snapshot, logCacheFlushId);
+ return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker);
}
/*
@@ -435,7 +434,8 @@ public class Store implements HeapSize {
* @throws IOException
*/
private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
- final long logCacheFlushId)
+ final long logCacheFlushId,
+ TimeRangeTracker snapshotTimeRangeTracker)
throws IOException {
StoreFile.Writer writer = null;
long flushed = 0;
@@ -450,6 +450,7 @@ public class Store implements HeapSize {
synchronized (flushLock) {
// A. Write the map out to the disk
writer = createWriterInTmp(set.size());
+ writer.setTimeRangeTracker(snapshotTimeRangeTracker);
int entries = 0;
try {
for (KeyValue kv: set) {
@@ -466,12 +467,12 @@ public class Store implements HeapSize {
writer.close();
}
}
-
+
// Write-out finished successfully, move into the right spot
Path dstPath = StoreFile.getUniqueFile(fs, homedir);
LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
fs.rename(writer.getPath(), dstPath);
-
+
StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
this.conf, this.family.getBloomFilterType(), this.inMemory);
StoreFile.Reader r = sf.createReader();
@@ -664,7 +665,7 @@ public class Store implements HeapSize {
LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
(references? ", hasReferences=true,": " ") + " into " +
- region.getTmpDir() + ", seqid=" + maxId);
+ region.getTmpDir() + ", sequenceid=" + maxId);
StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer);
@@ -954,13 +955,6 @@ public class Store implements HeapSize {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
- static void expiredOrDeleted(final Set<KeyValue> set, final KeyValue kv) {
- boolean b = set.remove(kv);
- if (LOG.isDebugEnabled()) {
- LOG.debug(kv.toString() + " expired: " + b);
- }
- }
-
static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
return key.getTimestamp() < oldestTimestamp;
}
@@ -1200,7 +1194,7 @@ public class Store implements HeapSize {
* Return a scanner for both the memstore and the HStore files
* @throws IOException
*/
- protected KeyValueScanner getScanner(Scan scan,
+ public KeyValueScanner getScanner(Scan scan,
final NavigableSet<byte []> targetCols) throws IOException {
lock.readLock().lock();
try {
@@ -1285,85 +1279,6 @@ public class Store implements HeapSize {
}
/**
- * Convenience method that implements the old MapFile.getClosest on top of
- * HFile Scanners. getClosest used seek to the asked-for key or just after
- * (HFile seeks to the key or just before).
- * @param s Scanner to use
- * @param kv Key to find.
- * @return True if we were able to seek the scanner to <code>b</code> or to
- * the key just after.
- * @throws IOException
- */
- static boolean getClosest(final HFileScanner s, final KeyValue kv)
- throws IOException {
- // Pass offsets to key content of a KeyValue; thats whats in the hfile index.
- int result = s.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
- if (result < 0) {
- // Not in file. Will the first key do?
- if (!s.seekTo()) {
- return false;
- }
- } else if (result > 0) {
- // Less than what was asked for but maybe < because we're asking for
- // r/c/HConstants.LATEST_TIMESTAMP -- what was returned was r/c-1/SOME_TS...
- // A next will get us a r/c/SOME_TS.
- if (!s.next()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Retrieve results from this store given the specified Get parameters.
- * @param get Get operation
- * @param columns List of columns to match, can be empty (not null)
- * @param result List to add results to
- * @throws IOException
- */
- public void get(Get get, NavigableSet<byte[]> columns, List<KeyValue> result)
- throws IOException {
- KeyComparator keyComparator = this.comparator.getRawComparator();
-
- // Column matching and version enforcement
- QueryMatcher matcher = new QueryMatcher(get, this.family.getName(), columns,
- this.ttl, keyComparator, versionsToReturn(get.getMaxVersions()));
- this.lock.readLock().lock();
- try {
- // Read from memstore
- if(this.memstore.get(matcher, result)) {
- // Received early-out from memstore
- return;
- }
-
- // Check if we even have storefiles
- if (this.storefiles.isEmpty()) {
- return;
- }
-
- // Get storefiles for this store
- List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
- for (StoreFile sf : Iterables.reverse(this.storefiles)) {
- StoreFile.Reader r = sf.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + sf + " has a null Reader");
- continue;
- }
- // Get a scanner that caches the block and uses pread
- storefileScanners.add(r.getScanner(true, true));
- }
-
- // StoreFileGetScan will handle reading this store's storefiles
- StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
-
- // Run a GET scan and put results into the specified list
- scanner.get(result);
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
- /**
* Increments the value for the given row/family/qualifier.
*
* This function will always be seen as atomic by other readers
@@ -1380,49 +1295,18 @@ public class Store implements HeapSize {
public long updateColumnValue(byte [] row, byte [] f,
byte [] qualifier, long newValue)
throws IOException {
- List<KeyValue> result = new ArrayList<KeyValue>();
- KeyComparator keyComparator = this.comparator.getRawComparator();
-
- KeyValue kv = null;
- // Setting up the QueryMatcher
- Get get = new Get(row);
- NavigableSet<byte[]> qualifiers =
- new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- qualifiers.add(qualifier);
- QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
- keyComparator, 1);
- // lock memstore snapshot for this critical section:
this.lock.readLock().lock();
- memstore.readLockLock();
try {
- int memstoreCode = this.memstore.getWithCode(matcher, result);
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+
+ return this.memstore.updateColumnValue(row,
+ f,
+ qualifier,
+ newValue,
+ now);
- if (memstoreCode != 0) {
- // was in memstore (or snapshot)
- kv = result.get(0).clone();
- byte [] buffer = kv.getBuffer();
- int valueOffset = kv.getValueOffset();
- Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0,
- Bytes.SIZEOF_LONG);
- if (memstoreCode == 2) {
- // from snapshot, assign new TS
- long currTs = System.currentTimeMillis();
- if (currTs == kv.getTimestamp()) {
- currTs++; // unlikely but catastrophic
- }
- Bytes.putBytes(buffer, kv.getTimestampOffset(),
- Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG);
- }
- } else {
- kv = new KeyValue(row, f, qualifier,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue));
- }
- return add(kv);
- // end lock
} finally {
- memstore.readLockUnlock();
this.lock.readLock().unlock();
}
}
@@ -1436,6 +1320,7 @@ public class Store implements HeapSize {
private long cacheFlushId;
private SortedSet<KeyValue> snapshot;
private StoreFile storeFile;
+ private TimeRangeTracker snapshotTimeRangeTracker;
private StoreFlusherImpl(long cacheFlushId) {
this.cacheFlushId = cacheFlushId;
@@ -1445,11 +1330,12 @@ public class Store implements HeapSize {
public void prepare() {
memstore.snapshot();
this.snapshot = memstore.getSnapshot();
+ this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
}
@Override
public void flushCache() throws IOException {
- storeFile = Store.this.flushCache(cacheFlushId, snapshot);
+ storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker);
}
@Override
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Fri Aug 27 05:01:02 2010
@@ -24,9 +24,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -38,7 +38,9 @@ import org.apache.hadoop.hbase.util.Bloo
import org.apache.hadoop.hbase.util.ByteBloomFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Hash;
+import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Function;
@@ -105,6 +107,8 @@ public class StoreFile {
public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
/** Bloom filter Type in FileInfo */
static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
+ /** Key for Timerange information in metadata*/
+ static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
/** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */
static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
@@ -411,6 +415,17 @@ public class StoreFile {
this.reader.loadBloomfilter();
}
+ try {
+ byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
+ if (timerangeBytes != null) {
+ this.reader.timeRangeTracker = new TimeRangeTracker();
+ Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
+ }
+ } catch (IllegalArgumentException e) {
+ LOG.error("Error reading timestamp range data from meta -- " +
+ "proceeding without", e);
+ this.reader.timeRangeTracker = null;
+ }
return this.reader;
}
@@ -647,6 +662,14 @@ public class StoreFile {
private KVComparator kvComparator;
private KeyValue lastKv = null;
private byte[] lastByteArray = null;
+ TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
+ /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
+ * When flushing a memstore, we set TimeRange and use this variable to
+ * indicate that it doesn't need to be calculated again while
+ * appending KeyValues.
+ * It is not set in cases of compactions when it is recalculated using only
+ * the appended KeyValues*/
+ boolean isTimeRangeTrackerSet = false;
protected HFile.Writer writer;
/**
@@ -671,6 +694,13 @@ public class StoreFile {
if (bloomType != BloomType.NONE && conf != null) {
float err = conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float)0.01);
+ // Since in row+col blooms we have 2 calls to shouldSeek() instead of 1
+ // and the false positives are adding up, we should keep the error rate
+ // twice as low in order to maintain the number of false positives as
+ // desired by the user
+ if (bloomType == BloomType.ROWCOL) {
+ err /= 2;
+ }
int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, 7);
this.bloomFilter = new ByteBloomFilter(maxKeys, err,
@@ -693,7 +723,49 @@ public class StoreFile {
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
throws IOException {
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
- writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
+ writer.appendFileInfo(MAJOR_COMPACTION_KEY,
+ Bytes.toBytes(majorCompaction));
+ appendTimeRangeMetadata();
+ }
+
+ /**
+ * Add TimestampRange to Metadata
+ */
+ public void appendTimeRangeMetadata() throws IOException {
+ appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
+ }
+
+ /**
+ * Set TimeRangeTracker
+ * @param trt
+ */
+ public void setTimeRangeTracker(final TimeRangeTracker trt) {
+ this.timeRangeTracker = trt;
+ isTimeRangeTrackerSet = true;
+ }
+
+ /**
+ * If the timeRangeTracker is not set,
+ * update TimeRangeTracker to include the timestamp of this key
+ * @param kv
+ * @throws IOException
+ */
+ public void includeInTimeRangeTracker(final KeyValue kv) {
+ if (!isTimeRangeTrackerSet) {
+ timeRangeTracker.includeTimestamp(kv);
+ }
+ }
+
+ /**
+ * If the timeRangeTracker is not set,
+ * update TimeRangeTracker to include the timestamp of this key
+ * @param key
+ * @throws IOException
+ */
+ public void includeInTimeRangeTracker(final byte [] key) {
+ if (!isTimeRangeTrackerSet) {
+ timeRangeTracker.includeTimestamp(key);
+ }
}
public void append(final KeyValue kv) throws IOException {
@@ -735,7 +807,6 @@ public class StoreFile {
byte [] result = new byte[rl + ql];
System.arraycopy(kv.getBuffer(), ro, result, 0, rl);
System.arraycopy(kv.getBuffer(), qo, result, rl, ql);
-
this.bloomFilter.add(result);
break;
default:
@@ -744,6 +815,7 @@ public class StoreFile {
}
}
writer.append(kv);
+ includeInTimeRangeTracker(kv);
}
public Path getPath() {
@@ -759,6 +831,7 @@ public class StoreFile {
}
}
writer.append(key, value);
+ includeInTimeRangeTracker(key);
}
public void close() throws IOException {
@@ -794,6 +867,7 @@ public class StoreFile {
protected BloomFilter bloomFilter = null;
protected BloomType bloomFilterType;
private final HFile.Reader reader;
+ protected TimeRangeTracker timeRangeTracker = null;
public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory)
throws IOException {
@@ -834,20 +908,35 @@ public class StoreFile {
reader.close();
}
- public boolean shouldSeek(final byte[] row,
- final SortedSet<byte[]> columns) {
+ public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
+ return (passesTimerangeFilter(scan) && passesBloomFilter(scan,columns));
+ }
- if (this.bloomFilter == null) {
+ /**
+ * Check if this storeFile may contain keys within the TimeRange
+ * @param scan
+ * @return False if it definitely does not exist in this StoreFile
+ */
+ private boolean passesTimerangeFilter(Scan scan) {
+ if (timeRangeTracker == null) {
return true;
+ } else {
+ return timeRangeTracker.includesTimeRange(scan.getTimeRange());
}
+ }
+ private boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) {
+ if (this.bloomFilter == null || !scan.isGetScan()) {
+ return true;
+ }
+ byte[] row = scan.getStartRow();
byte[] key;
switch (this.bloomFilterType) {
case ROW:
key = row;
break;
case ROWCOL:
- if (columns.size() == 1) {
+ if (columns != null && columns.size() == 1) {
byte[] col = columns.first();
key = Bytes.add(row, col);
break;
@@ -860,7 +949,17 @@ public class StoreFile {
try {
ByteBuffer bloom = reader.getMetaBlock(BLOOM_FILTER_DATA_KEY, true);
if (bloom != null) {
- return this.bloomFilter.contains(key, bloom);
+ if (this.bloomFilterType == BloomType.ROWCOL) {
+ // Since a Row Delete is essentially a DeleteFamily applied to all
+ // columns, a file might be skipped if using row+col Bloom filter.
+ // In order to ensure this file is included an additional check is
+ // required looking only for a row bloom.
+ return this.bloomFilter.contains(key, bloom) ||
+ this.bloomFilter.contains(row, bloom);
+ }
+ else {
+ return this.bloomFilter.contains(key, bloom);
+ }
}
} catch (IOException e) {
LOG.error("Error reading bloom filter data -- proceeding without",
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Fri Aug 27 05:01:02 2010
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionse
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import java.io.IOException;
@@ -104,6 +105,20 @@ class StoreFileScanner implements KeyVal
}
}
+ public boolean reseek(KeyValue key) throws IOException {
+ try {
+ if (!reseekAtOrAfter(hfs, key)) {
+ close();
+ return false;
+ }
+ cur = hfs.getKeyValue();
+ hfs.next();
+ return true;
+ } catch (IOException ioe) {
+ throw new IOException("Could not seek " + this, ioe);
+ }
+ }
+
public void close() {
// Nothing to close on HFileScanner?
cur = null;
@@ -131,9 +146,21 @@ class StoreFileScanner implements KeyVal
return true;
}
- // Bloom filter hook.
- public boolean shouldSeek(final byte[] row,
- final SortedSet<byte[]> columns) {
- return reader.shouldSeek(row, columns);
+ static boolean reseekAtOrAfter(HFileScanner s, KeyValue k)
+ throws IOException {
+ //This function is similar to seekAtOrAfter function
+ int result = s.reseekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
+ if (result <= 0) {
+ return true;
+ } else {
+ // passed KV is larger than current KV in file, if there is a next
+ // it is after, if not then this scanner is done.
+ return s.next();
+ }
+ }
+
+ // StoreFile filter hook.
+ public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
+ return reader.shouldSeek(scan, columns);
}
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Fri Aug 27 05:01:02 2010
@@ -154,30 +154,24 @@ class StoreScanner implements KeyValueSc
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
- // exclude scan files that have failed file filters
+ // include only those scan files which pass all filters
for (StoreFileScanner sfs : sfScanners) {
- if (isGet &&
- !sfs.shouldSeek(scan.getStartRow(), columns)) {
- continue; // exclude this hfs
+ if (sfs.shouldSeek(scan, columns)) {
+ scanners.add(sfs);
}
- scanners.add(sfs);
}
// Then the memstore scanners
- scanners.addAll(this.store.memstore.getScanners());
+ if (this.store.memstore.shouldSeek(scan)) {
+ scanners.addAll(this.store.memstore.getScanners());
+ }
return scanners;
}
public synchronized KeyValue peek() {
- try {
- checkReseek();
- } catch (IOException e) {
- throw new RuntimeException("IOE conversion", e);
- }
if (this.heap == null) {
- return null;
+ return this.lastTop;
}
-
return this.heap.peek();
}
@@ -233,11 +227,16 @@ class StoreScanner implements KeyValueSc
return false;
}
- matcher.setRow(peeked.getRow());
+ // only call setRow if the row changes; avoids confusing the query matcher
+ // if scanning intra-row
+ if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
+ matcher.setRow(peeked.getRow());
+ }
+
KeyValue kv;
List<KeyValue> results = new ArrayList<KeyValue>();
LOOP: while((kv = this.heap.peek()) != null) {
- QueryMatcher.MatchCode qcode = matcher.match(kv);
+ ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
//DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
switch(qcode) {
case INCLUDE:
@@ -262,6 +261,10 @@ class StoreScanner implements KeyValueSc
return false;
case SEEK_NEXT_ROW:
+ if (!matcher.moreRowsMayExistAfter(kv)) {
+ outResult.addAll(results);
+ return false;
+ }
heap.next();
break;
@@ -276,6 +279,15 @@ class StoreScanner implements KeyValueSc
this.heap.next();
break;
+ case SEEK_NEXT_USING_HINT:
+ KeyValue nextKV = matcher.getNextKeyHint(kv);
+ if (nextKV != null) {
+ reseek(nextKV);
+ } else {
+ heap.next();
+ }
+ break;
+
default:
throw new RuntimeException("UNEXPECTED");
}
@@ -321,18 +333,20 @@ class StoreScanner implements KeyValueSc
private void checkReseek() throws IOException {
if (this.heap == null && this.lastTop != null) {
-
- reseek(this.lastTop);
+ resetScannerStack(this.lastTop);
this.lastTop = null; // gone!
}
// else dont need to reseek
}
- private void reseek(KeyValue lastTopKey) throws IOException {
+ private void resetScannerStack(KeyValue lastTopKey) throws IOException {
if (heap != null) {
throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
}
+ /* When we have the scan object, should we not pass it to getScanners()
+ * to get a limited set of scanners? We did so in the constructor and we
+ * could have done it now by storing the scan object from the constructor */
List<KeyValueScanner> scanners = getScanners();
for(KeyValueScanner scanner : scanners) {
@@ -347,4 +361,11 @@ class StoreScanner implements KeyValueSc
KeyValue kv = heap.peek();
matcher.setRow((kv == null ? lastTopKey : kv).getRow());
}
+
+ @Override
+ public synchronized boolean reseek(KeyValue kv) throws IOException {
+ //Heap cannot be null, because this is only called from next() which
+ //guarantees that heap will never be null before this call.
+ return this.heap.reseek(kv);
+ }
}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,147 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Stores the minimum and maximum timestamp values.
+ * Can be used to find if any given time range overlaps with its time range
+ * MemStores use this class to track its minimum and maximum timestamps.
+ * When writing StoreFiles, this information is stored in meta blocks and used
+ * at read time to match against the required TimeRange
+ */
+public class TimeRangeTracker implements Writable {
+
+ long minimumTimestamp = -1;
+ long maximumTimestamp = -1;
+
+ /**
+ * Default constructor.
+ * Initializes TimeRange to be null
+ */
+ public TimeRangeTracker() {
+
+ }
+
+ /**
+ * Copy Constructor
+ * @param trt source TimeRangeTracker
+ */
+ public TimeRangeTracker(final TimeRangeTracker trt) {
+ this.minimumTimestamp = trt.getMinimumTimestamp();
+ this.maximumTimestamp = trt.getMaximumTimestamp();
+ }
+
+ public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
+ this.minimumTimestamp = minimumTimestamp;
+ this.maximumTimestamp = maximumTimestamp;
+ }
+
+ /**
+ * Update the current TimestampRange to include the timestamp from KeyValue
+ * If the Key is of type DeleteColumn or DeleteFamily, it includes the
+ * entire time range from 0 to timestamp of the key.
+ * @param kv the KeyValue to include
+ */
+ public void includeTimestamp(final KeyValue kv) {
+ includeTimestamp(kv.getTimestamp());
+ if (kv.isDeleteColumnOrFamily()) {
+ includeTimestamp(0);
+ }
+ }
+
+ /**
+ * Update the current TimestampRange to include the timestamp from Key.
+ * If the Key is of type DeleteColumn or DeleteFamily, it includes the
+ * entire time range from 0 to timestamp of the key.
+ * @param key
+ */
+ public void includeTimestamp(final byte[] key) {
+ includeTimestamp(Bytes.toLong(key,key.length-KeyValue.TIMESTAMP_TYPE_SIZE));
+ int type = key[key.length - 1];
+ if (type == Type.DeleteColumn.getCode() ||
+ type == Type.DeleteFamily.getCode()) {
+ includeTimestamp(0);
+ }
+ }
+
+ /**
+ * If required, update the current TimestampRange to include timestamp
+ * @param timestamp the timestamp value to include
+ */
+ private void includeTimestamp(final long timestamp) {
+ if (maximumTimestamp == -1) {
+ minimumTimestamp = timestamp;
+ maximumTimestamp = timestamp;
+ }
+ else if (minimumTimestamp > timestamp) {
+ minimumTimestamp = timestamp;
+ }
+ else if (maximumTimestamp < timestamp) {
+ maximumTimestamp = timestamp;
+ }
+ return;
+ }
+
+ /**
+ * Check if the range has any overlap with TimeRange
+ * @param tr TimeRange
+ * @return True if there is overlap, false otherwise
+ */
+ public boolean includesTimeRange(final TimeRange tr) {
+ return (this.minimumTimestamp < tr.getMax() &&
+ this.maximumTimestamp >= tr.getMin());
+ }
+
+ /**
+ * @return the minimumTimestamp
+ */
+ public long getMinimumTimestamp() {
+ return minimumTimestamp;
+ }
+
+ /**
+ * @return the maximumTimestamp
+ */
+ public long getMaximumTimestamp() {
+ return maximumTimestamp;
+ }
+
+ public void write(final DataOutput out) throws IOException {
+ out.writeLong(minimumTimestamp);
+ out.writeLong(maximumTimestamp);
+ }
+
+ public void readFields(final DataInput in) throws IOException {
+ this.minimumTimestamp = in.readLong();
+ this.maximumTimestamp = in.readLong();
+ }
+
+}
+
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Aug 27 05:01:02 2010
@@ -28,6 +28,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URLEncoder;
import java.util.ArrayList;
@@ -36,11 +37,13 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -51,6 +54,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -59,6 +63,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -120,6 +125,15 @@ public class HLog implements Syncable {
static final Log LOG = LogFactory.getLog(HLog.class);
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
static final byte [] METAROW = Bytes.toBytes("METAROW");
+
+ /*
+ * Name of directory that holds recovered edits written by the wal log
+ * splitting code, one per region
+ */
+ private static final String RECOVERED_EDITS_DIR = "recovered.edits";
+ private static final Pattern EDITFILES_NAME_PATTERN =
+ Pattern.compile("-?[0-9]+");
+
private final FileSystem fs;
private final Path dir;
private final Configuration conf;
@@ -142,11 +156,6 @@ public class HLog implements Syncable {
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{};
- /** Name of file that holds recovered edits written by the wal log splitting
- * code, one per region
- */
- public static final String RECOVERED_EDITS = "recovered.edits";
-
// used to indirectly tell syncFs to force the sync
private boolean forceSync = false;
@@ -220,6 +229,9 @@ public class HLog implements Syncable {
*/
private final LogSyncer logSyncerThread;
+ private final List<LogEntryVisitor> logEntryVisitors =
+ new CopyOnWriteArrayList<LogEntryVisitor>();
+
/**
* Pattern used to validate a HLog file name
*/
@@ -388,7 +400,7 @@ public class HLog implements Syncable {
!this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
// This could spin on occasion but better the occasional spin than locking
// every increment of sequence number.
- LOG.debug("Change sequence number from " + logSeqNum + " to " + newvalue);
+ LOG.debug("Changed sequenceid from " + logSeqNum + " to " + newvalue);
}
}
@@ -434,22 +446,27 @@ public class HLog implements Syncable {
if (closed) {
return regionsToFlush;
}
+ // Do all the preparation outside of the updateLock to block
+ // as less as possible the incoming writes
+ long currentFilenum = this.filenum;
+ this.filenum = System.currentTimeMillis();
+ Path newPath = computeFilename();
+ HLog.Writer nextWriter = createWriter(fs, newPath, HBaseConfiguration.create(conf));
+ int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
+ // Can we get at the dfsclient outputstream? If an instance of
+ // SFLW, it'll have done the necessary reflection to get at the
+ // protected field name.
+ OutputStream nextHdfsOut = null;
+ if (nextWriter instanceof SequenceFileLogWriter) {
+ nextHdfsOut =
+ ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream();
+ }
synchronized (updateLock) {
// Clean up current writer.
- Path oldFile = cleanupCurrentWriter(this.filenum);
- this.filenum = System.currentTimeMillis();
- Path newPath = computeFilename();
- this.writer = createWriter(fs, newPath, HBaseConfiguration.create(conf));
- this.initialReplication = fs.getFileStatus(newPath).getReplication();
-
- // Can we get at the dfsclient outputstream? If an instance of
- // SFLW, it'll have done the necessary reflection to get at the
- // protected field name.
- this.hdfs_out = null;
- if (this.writer instanceof SequenceFileLogWriter) {
- this.hdfs_out =
- ((SequenceFileLogWriter)this.writer).getDFSCOutputStream();
- }
+ Path oldFile = cleanupCurrentWriter(currentFilenum);
+ this.writer = nextWriter;
+ this.initialReplication = nextInitialReplication;
+ this.hdfs_out = nextHdfsOut;
LOG.info((oldFile != null?
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
@@ -457,28 +474,28 @@ public class HLog implements Syncable {
", filesize=" +
this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
"New hlog " + FSUtils.getPath(newPath));
- // Tell our listeners that a new log was created
- if (!this.actionListeners.isEmpty()) {
- for (LogActionsListener list : this.actionListeners) {
- list.logRolled(newPath);
- }
+ this.numEntries.set(0);
+ }
+ // Tell our listeners that a new log was created
+ if (!this.actionListeners.isEmpty()) {
+ for (LogActionsListener list : this.actionListeners) {
+ list.logRolled(newPath);
}
- // Can we delete any of the old log files?
- if (this.outputfiles.size() > 0) {
- if (this.lastSeqWritten.size() <= 0) {
- LOG.debug("Last sequence written is empty. Deleting all old hlogs");
- // If so, then no new writes have come in since all regions were
- // flushed (and removed from the lastSeqWritten map). Means can
- // remove all but currently open log file.
- for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
- archiveLogFile(e.getValue(), e.getKey());
- }
- this.outputfiles.clear();
- } else {
- regionsToFlush = cleanOldLogs();
+ }
+ // Can we delete any of the old log files?
+ if (this.outputfiles.size() > 0) {
+ if (this.lastSeqWritten.size() <= 0) {
+ LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
+ // If so, then no new writes have come in since all regions were
+ // flushed (and removed from the lastSeqWritten map). Means can
+ // remove all but currently open log file.
+ for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
+ archiveLogFile(e.getValue(), e.getKey());
}
+ this.outputfiles.clear();
+ } else {
+ regionsToFlush = cleanOldLogs();
}
- this.numEntries.set(0);
}
} finally {
this.cacheFlushLock.unlock();
@@ -560,7 +577,7 @@ public class HLog implements Syncable {
byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
LOG.debug("Found " + logsToRemove + " hlogs to remove " +
" out of total " + this.outputfiles.size() + "; " +
- "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
+ "oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
" from region " + Bytes.toString(oldestRegion));
}
for (Long seq : sequenceNumbers) {
@@ -650,7 +667,7 @@ public class HLog implements Syncable {
throw e;
}
if (currentfilenum >= 0) {
- oldFile = computeFilename();
+ oldFile = computeFilename(currentfilenum);
this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
}
}
@@ -660,22 +677,27 @@ public class HLog implements Syncable {
private void archiveLogFile(final Path p, final Long seqno) throws IOException {
Path newPath = getHLogArchivePath(this.oldLogDir, p);
LOG.info("moving old hlog file " + FSUtils.getPath(p) +
- " whose highest sequence/edit id is " + seqno + " to " +
+ " whose highest sequenceid is " + seqno + " to " +
FSUtils.getPath(newPath));
this.fs.rename(p, newPath);
- if (!this.actionListeners.isEmpty()) {
- for (LogActionsListener list : this.actionListeners) {
- list.logArchived(p, newPath);
- }
- }
}
/**
* This is a convenience method that computes a new filename with a given
- * file-number.
+ * using the current HLog file-number
* @return Path
*/
protected Path computeFilename() {
+ return computeFilename(this.filenum);
+ }
+
+ /**
+ * This is a convenience method that computes a new filename with a given
+ * file-number.
+ * @param file-number to use
+ * @return Path
+ */
+ protected Path computeFilename(long filenum) {
if (filenum < 0) {
throw new RuntimeException("hlog file number can't be < 0");
}
@@ -997,10 +1019,13 @@ public class HLog implements Syncable {
* If the pipeline isn't started yet or is empty, you will get the default
* replication factor. Therefore, if this function returns 0, it means you
* are not properly running with the HDFS-826 patch.
+ * @throws InvocationTargetException
+ * @throws IllegalAccessException
+ * @throws IllegalArgumentException
*
* @throws Exception
*/
- int getLogReplication() throws Exception {
+ int getLogReplication() throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
if (repl instanceof Integer) {
@@ -1030,6 +1055,11 @@ public class HLog implements Syncable {
if (!this.enabled) {
return;
}
+ if (!this.logEntryVisitors.isEmpty()) {
+ for (LogEntryVisitor visitor : this.logEntryVisitors) {
+ visitor.visitLogEntryBeforeWrite(info, logKey, logEdit);
+ }
+ }
try {
long now = System.currentTimeMillis();
this.writer.append(new HLog.Entry(logKey, logEdit));
@@ -1181,8 +1211,16 @@ public class HLog implements Syncable {
srcDir.toString());
splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
try {
- LOG.info("Spliting is done. Removing old log dir "+srcDir);
- fs.delete(srcDir, false);
+ FileStatus[] files = fs.listStatus(srcDir);
+ for(FileStatus file : files) {
+ Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
+ LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " +
+ FSUtils.getPath(newPath));
+ fs.rename(file.getPath(), newPath);
+ }
+ LOG.debug("Moved " + files.length + " log files to " +
+ FSUtils.getPath(oldLogDir));
+ fs.delete(srcDir, true);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
IOException io = new IOException("Cannot delete: " + srcDir);
@@ -1442,7 +1480,7 @@ public class HLog implements Syncable {
NamingThreadFactory f = new NamingThreadFactory(
"SplitWriter-%1$d", Executors.defaultThreadFactory());
ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f);
- for (final byte[] region : splitLogsMap.keySet()) {
+ for (final byte [] region : splitLogsMap.keySet()) {
Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
writeFutureResult.put(region, threadPool.submit(splitter));
}
@@ -1562,17 +1600,19 @@ public class HLog implements Syncable {
WriterAndPath wap = logWriters.get(region);
for (Entry logEntry: entries) {
if (wap == null) {
- Path logFile = getRegionLogPath(logEntry, rootDir);
- if (fs.exists(logFile)) {
- LOG.warn("Found existing old hlog file. It could be the result of a previous" +
- "failed split attempt. Deleting " + logFile +
- ", length=" + fs.getFileStatus(logFile).getLen());
- fs.delete(logFile, false);
+ Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
+ if (fs.exists(regionedits)) {
+ LOG.warn("Found existing old edits file. It could be the " +
+ "result of a previous failed split attempt. Deleting " +
+ regionedits + ", length=" + fs.getFileStatus(regionedits).getLen());
+ if (!fs.delete(regionedits, false)) {
+ LOG.warn("Failed delete of old " + regionedits);
+ }
}
- Writer w = createWriter(fs, logFile, conf);
- wap = new WriterAndPath(logFile, w);
+ Writer w = createWriter(fs, regionedits, conf);
+ wap = new WriterAndPath(regionedits, w);
logWriters.put(region, wap);
- LOG.debug("Creating writer path=" + logFile +
+ LOG.debug("Creating writer path=" + regionedits +
" region=" + Bytes.toStringBinary(region));
}
wap.w.append(logEntry);
@@ -1626,19 +1666,116 @@ public class HLog implements Syncable {
}
}
- private static Path getRegionLogPath(Entry logEntry, Path rootDir) {
- Path tableDir =
- HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
- Path regionDir =
- HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
- return new Path(regionDir, RECOVERED_EDITS);
+ /*
+ * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
+ * <code>logEntry</code> named for the sequenceid in the passed
+ * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
+ * This method also ensures existence of RECOVERED_EDITS_DIR under the region
+ * creating it if necessary.
+ * @param fs
+ * @param logEntry
+ * @param rootDir HBase root dir.
+ * @return Path to file into which to dump split log edits.
+ * @throws IOException
+ */
+ private static Path getRegionSplitEditsPath(final FileSystem fs,
+ final Entry logEntry, final Path rootDir)
+ throws IOException {
+ Path tableDir = HTableDescriptor.getTableDir(rootDir,
+ logEntry.getKey().getTablename());
+ Path regiondir = HRegion.getRegionDir(tableDir,
+ HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
+ Path dir = getRegionDirRecoveredEditsDir(regiondir);
+ if (!fs.exists(dir)) {
+ if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+ }
+ return new Path(dir,
+ formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()));
}
+ static String formatRecoveredEditsFileName(final long seqid) {
+ return String.format("%019d", seqid);
+ }
+
+ /**
+ * Returns sorted set of edit files made by wal-log splitter.
+ * @param fs
+ * @param regiondir
+ * @return Files in passed <code>regiondir</code> as a sorted set.
+ * @throws IOException
+ */
+ public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
+ final Path regiondir)
+ throws IOException {
+ Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
+ FileStatus [] files = fs.listStatus(editsdir, new PathFilter () {
+ @Override
+ public boolean accept(Path p) {
+ boolean result = false;
+ try {
+ // Return files and only files that match the editfile names pattern.
+ // There can be other files in this directory other than edit files.
+ // In particular, on error, we'll move aside the bad edit file giving
+ // it a timestamp suffix. See moveAsideBadEditsFile.
+ Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
+ result = fs.isFile(p) && m.matches();
+ } catch (IOException e) {
+ LOG.warn("Failed isFile check on " + p);
+ }
+ return result;
+ }
+ });
+ NavigableSet<Path> filesSorted = new TreeSet<Path>();
+ if (files == null) return filesSorted;
+ for (FileStatus status: files) {
+ filesSorted.add(status.getPath());
+ }
+ return filesSorted;
+ }
+ /**
+ * Move aside a bad edits file.
+ * @param fs
+ * @param edits Edits file to move aside.
+ * @return The name of the moved aside file.
+ * @throws IOException
+ */
+ public static Path moveAsideBadEditsFile(final FileSystem fs,
+ final Path edits)
+ throws IOException {
+ Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
+ System.currentTimeMillis());
+ if (!fs.rename(edits, moveAsideName)) {
+ LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
+ }
+ return moveAsideName;
+ }
+ /**
+ * @param regiondir This regions directory in the filesystem.
+ * @return The directory that holds recovered edits files for the region
+ * <code>regiondir</code>
+ */
+ public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
+ return new Path(regiondir, RECOVERED_EDITS_DIR);
+ }
+ /**
+ *
+ * @param visitor
+ */
+ public void addLogEntryVisitor(LogEntryVisitor visitor) {
+ this.logEntryVisitors.add(visitor);
+ }
+ /**
+ *
+ * @param visitor
+ */
+ public void removeLogEntryVisitor(LogEntryVisitor visitor) {
+ this.logEntryVisitors.remove(visitor);
+ }
public void addLogActionsListerner(LogActionsListener list) {
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Fri Aug 27 05:01:02 2010
@@ -46,7 +46,6 @@ public class HLogKey implements Writable
private long writeTime;
private byte clusterId;
- private int scope;
/** Writable Consructor -- Do not use. */
public HLogKey() {
@@ -70,7 +69,6 @@ public class HLogKey implements Writable
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
- this.scope = HConstants.REPLICATION_SCOPE_LOCAL;
}
//////////////////////////////////////////////////////////////////////////////
@@ -119,22 +117,6 @@ public class HLogKey implements Writable
this.clusterId = clusterId;
}
- /**
- * Get the replication scope of this key
- * @return replication scope
- */
- public int getScope() {
- return this.scope;
- }
-
- /**
- * Set the replication scope of this key
- * @param scope The new scope
- */
- public void setScope(int scope) {
- this.scope = scope;
- }
-
@Override
public String toString() {
return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
@@ -158,7 +140,6 @@ public class HLogKey implements Writable
result ^= this.logSeqNum;
result ^= this.writeTime;
result ^= this.clusterId;
- result ^= this.scope;
return result;
}
@@ -187,7 +168,6 @@ public class HLogKey implements Writable
out.writeLong(this.logSeqNum);
out.writeLong(this.writeTime);
out.writeByte(this.clusterId);
- out.writeInt(this.scope);
}
public void readFields(DataInput in) throws IOException {
@@ -197,7 +177,6 @@ public class HLogKey implements Writable
this.writeTime = in.readLong();
try {
this.clusterId = in.readByte();
- this.scope = in.readInt();
} catch(EOFException e) {
// Means it's an old key, just continue
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java Fri Aug 27 05:01:02 2010
@@ -33,11 +33,4 @@ public interface LogActionsListener {
* @param newFile the path to the new hlog
*/
public void logRolled(Path newFile);
-
- /**
- * Notify that the following log moved
- * @param oldPath the old path
- * @param newPath the new path
- */
- public void logArchived(Path oldPath, Path newPath);
}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,15 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+
+public interface LogEntryVisitor {
+
+ /**
+ *
+ * @param info
+ * @param logKey
+ * @param logEdit
+ */
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+ WALEdit logEdit);
+}