You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2008/02/25 20:50:03 UTC
svn commit: r630968 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/regionserver/
Author: bryanduxbury
Date: Mon Feb 25 11:50:02 2008
New Revision: 630968
URL: http://svn.apache.org/viewvc?rev=630968&view=rev
Log:
HBASE-442 Move internal classes out of HRegionServer
Added:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=630968&r1=630967&r2=630968&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Feb 25 11:50:02 2008
@@ -65,7 +65,8 @@
HBASE-457 Factor Master into Master, RegionManager, and ServerManager
HBASE-464 HBASE-419 introduced javadoc errors
HBASE-468 Move HStoreKey back to o.a.h.h
-
+ HBASE-442 Move internal classes out of HRegionServer
+
Branch 0.1
INCOMPATIBLE CHANGES
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=630968&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Mon Feb 25 11:50:02 2008
@@ -0,0 +1,201 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.util.Writables;
+
+/**
+ * Compact region on request and then run split if appropriate
+ */
+class CompactSplitThread extends Thread
+implements RegionUnavailableListener, HConstants {
+ static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
+
+ private HTable root = null;
+ private HTable meta = null;
+ private long startTime;
+ private final long frequency;
+
+ private HRegionServer server;
+ private HBaseConfiguration conf;
+
+ private final BlockingQueue<QueueEntry> compactionQueue =
+ new LinkedBlockingQueue<QueueEntry>();
+
+ /** constructor */
+ public CompactSplitThread(HRegionServer server) {
+ super();
+ this.server = server;
+ this.conf = server.conf;
+ this.frequency =
+ conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
+ 20 * 1000);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run() {
+ while (!server.isStopRequested()) {
+ QueueEntry e = null;
+ try {
+ e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
+ if (e == null) {
+ continue;
+ }
+ e.getRegion().compactIfNeeded();
+ split(e.getRegion());
+ } catch (InterruptedException ex) {
+ continue;
+ } catch (IOException ex) {
+ LOG.error("Compaction failed" +
+ (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+ RemoteExceptionHandler.checkIOException(ex));
+ if (!server.checkFileSystem()) {
+ break;
+ }
+
+ } catch (Exception ex) {
+ LOG.error("Compaction failed" +
+ (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+ ex);
+ if (!server.checkFileSystem()) {
+ break;
+ }
+ }
+ }
+ LOG.info(getName() + " exiting");
+ }
+
+ /**
+ * @param e QueueEntry for region to be compacted
+ */
+ public void compactionRequested(QueueEntry e) {
+ compactionQueue.add(e);
+ }
+
+ void compactionRequested(final HRegion r) {
+ compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
+ }
+
+ private void split(final HRegion region) throws IOException {
+ final HRegionInfo oldRegionInfo = region.getRegionInfo();
+ final HRegion[] newRegions = region.splitRegion(this);
+ if (newRegions == null) {
+ // Didn't need to be split
+ return;
+ }
+
+ // When a region is split, the META table needs to updated if we're
+ // splitting a 'normal' region, and the ROOT table needs to be
+ // updated if we are splitting a META region.
+ HTable t = null;
+ if (region.getRegionInfo().isMetaTable()) {
+ // We need to update the root region
+ if (this.root == null) {
+ this.root = new HTable(conf, ROOT_TABLE_NAME);
+ }
+ t = root;
+ } else {
+ // For normal regions we need to update the meta region
+ if (meta == null) {
+ meta = new HTable(conf, META_TABLE_NAME);
+ }
+ t = meta;
+ }
+ LOG.info("Updating " + t.getTableName() + " with region split info");
+
+ // Mark old region as offline and split in META.
+ // NOTE: there is no need for retry logic here. HTable does it for us.
+ oldRegionInfo.setOffline(true);
+ oldRegionInfo.setSplit(true);
+ BatchUpdate update = new BatchUpdate(oldRegionInfo.getRegionName());
+ update.put(COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
+ update.put(COL_SPLITA, Writables.getBytes(newRegions[0].getRegionInfo()));
+ update.put(COL_SPLITB, Writables.getBytes(newRegions[1].getRegionInfo()));
+ t.commit(update);
+
+ // Add new regions to META
+ for (int i = 0; i < newRegions.length; i++) {
+ update = new BatchUpdate(newRegions[i].getRegionName());
+ update.put(COL_REGIONINFO, Writables.getBytes(
+ newRegions[i].getRegionInfo()));
+ t.commit(update);
+ }
+
+ // Now tell the master about the new regions
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reporting region split to master");
+ }
+ server.reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
+ newRegions[1].getRegionInfo());
+ LOG.info("region split, META updated, and report to master all" +
+ " successful. Old region=" + oldRegionInfo.toString() +
+ ", new regions: " + newRegions[0].toString() + ", " +
+ newRegions[1].toString() + ". Split took " +
+ StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
+
+ // Do not serve the new regions. Let the Master assign them.
+ }
+
+ /** {@inheritDoc} */
+ public void closing(final Text regionName) {
+ startTime = System.currentTimeMillis();
+ server.getWriteLock().lock();
+ try {
+ // Remove region from regions Map and add it to the Map of retiring
+ // regions.
+ server.setRegionClosing(regionName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(regionName.toString() + " closing (" +
+ "Adding to retiringRegions)");
+ }
+ } finally {
+ server.getWriteLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void closed(final Text regionName) {
+ server.getWriteLock().lock();
+ try {
+ server.setRegionClosed(regionName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(regionName.toString() + " closed");
+ }
+ } finally {
+ server.getWriteLock().unlock();
+ }
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java?rev=630968&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java Mon Feb 25 11:50:02 2008
@@ -0,0 +1,148 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.ConcurrentModificationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+
+/** Flush cache upon request */
+class Flusher extends Thread implements CacheFlushListener {
+ static final Log LOG = LogFactory.getLog(Flusher.class);
+ private final DelayQueue<QueueEntry> flushQueue =
+ new DelayQueue<QueueEntry>();
+
+ private final long optionalFlushPeriod;
+ private final HRegionServer server;
+ private final HBaseConfiguration conf;
+ private final Integer lock = new Integer(0);
+
+ /** constructor */
+ public Flusher(final HRegionServer server) {
+ super();
+ this.server = server;
+ conf = server.conf;
+ this.optionalFlushPeriod = conf.getLong(
+ "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run() {
+ while (!server.isStopRequested()) {
+ QueueEntry e = null;
+ try {
+ e = flushQueue.poll(server.threadWakeFrequency, TimeUnit.MILLISECONDS);
+ if (e == null) {
+ continue;
+ }
+ synchronized(lock) { // Don't interrupt while we're working
+ if (e.getRegion().flushcache()) {
+ server.compactionRequested(e);
+ }
+
+ e.setExpirationTime(System.currentTimeMillis() +
+ optionalFlushPeriod);
+ flushQueue.add(e);
+ }
+
+ // Now ensure that all the active regions are in the queue
+ Set<HRegion> regions = server.getRegionsToCheck();
+ for (HRegion r: regions) {
+ e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
+ synchronized (flushQueue) {
+ if (!flushQueue.contains(e)) {
+ flushQueue.add(e);
+ }
+ }
+ }
+
+ // Now make sure that the queue only contains active regions
+ synchronized (flushQueue) {
+ for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext(); ) {
+ e = i.next();
+ if (!regions.contains(e.getRegion())) {
+ i.remove();
+ }
+ }
+ }
+ } catch (InterruptedException ex) {
+ continue;
+ } catch (ConcurrentModificationException ex) {
+ continue;
+ } catch (DroppedSnapshotException ex) {
+ // Cache flush can fail in a few places. If it fails in a critical
+ // section, we get a DroppedSnapshotException and a replay of hlog
+ // is required. Currently the only way to do this is a restart of
+ // the server.
+ LOG.fatal("Replay of hlog required. Forcing server restart", ex);
+ if (!server.checkFileSystem()) {
+ break;
+ }
+ server.stop();
+ } catch (IOException ex) {
+ LOG.error("Cache flush failed" +
+ (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+ RemoteExceptionHandler.checkIOException(ex));
+ if (!server.checkFileSystem()) {
+ break;
+ }
+ } catch (Exception ex) {
+ LOG.error("Cache flush failed" +
+ (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+ ex);
+ if (!server.checkFileSystem()) {
+ break;
+ }
+ }
+ }
+ flushQueue.clear();
+ LOG.info(getName() + " exiting");
+ }
+
+ /** {@inheritDoc} */
+ public void flushRequested(HRegion region) {
+ QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
+ synchronized (flushQueue) {
+ if (flushQueue.contains(e)) {
+ flushQueue.remove(e);
+ }
+ flushQueue.add(e);
+ }
+ }
+
+ /**
+ * Only interrupt once it's done with a run through the work loop.
+ */
+ void interruptPolitely() {
+ synchronized (lock) {
+ interrupt();
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=630968&r1=630967&r2=630968&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Feb 25 11:50:02 2008
@@ -185,415 +185,18 @@
}
- /** Queue entry passed to flusher, compactor and splitter threads */
- class QueueEntry implements Delayed {
- private final HRegion region;
- private long expirationTime;
-
- QueueEntry(HRegion region, long expirationTime) {
- this.region = region;
- this.expirationTime = expirationTime;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean equals(Object o) {
- QueueEntry other = (QueueEntry) o;
- return this.hashCode() == other.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override
- public int hashCode() {
- return this.region.getRegionInfo().hashCode();
- }
-
- /** {@inheritDoc} */
- public long getDelay(TimeUnit unit) {
- return unit.convert(this.expirationTime - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- }
-
- /** {@inheritDoc} */
- public int compareTo(Delayed o) {
- long delta = this.getDelay(TimeUnit.MILLISECONDS) -
- o.getDelay(TimeUnit.MILLISECONDS);
-
- int value = 0;
- if (delta > 0) {
- value = 1;
-
- } else if (delta < 0) {
- value = -1;
- }
- return value;
- }
-
- /** @return the region */
- public HRegion getRegion() {
- return region;
- }
-
- /** @param expirationTime the expirationTime to set */
- public void setExpirationTime(long expirationTime) {
- this.expirationTime = expirationTime;
- }
- }
-
// Compactions
final CompactSplitThread compactSplitThread;
- // Needed during shutdown so we send an interrupt after completion of a
- // compaction, not in the midst.
- final Integer compactSplitLock = new Integer(0);
-
- /** Compact region on request and then run split if appropriate
- */
- private class CompactSplitThread extends Thread
- implements RegionUnavailableListener {
- private HTable root = null;
- private HTable meta = null;
- private long startTime;
- private final long frequency;
-
- private final BlockingQueue<QueueEntry> compactionQueue =
- new LinkedBlockingQueue<QueueEntry>();
-
- /** constructor */
- public CompactSplitThread() {
- super();
- this.frequency =
- conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
- 20 * 1000);
- }
-
- /** {@inheritDoc} */
- @Override
- public void run() {
- while (!stopRequested.get()) {
- QueueEntry e = null;
- try {
- e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
- if (e == null) {
- continue;
- }
- e.getRegion().compactIfNeeded();
- split(e.getRegion());
- } catch (InterruptedException ex) {
- continue;
- } catch (IOException ex) {
- LOG.error("Compaction failed" +
- (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
- RemoteExceptionHandler.checkIOException(ex));
- if (!checkFileSystem()) {
- break;
- }
-
- } catch (Exception ex) {
- LOG.error("Compaction failed" +
- (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
- ex);
- if (!checkFileSystem()) {
- break;
- }
- }
- }
- LOG.info(getName() + " exiting");
- }
-
- /**
- * @param e QueueEntry for region to be compacted
- */
- public void compactionRequested(QueueEntry e) {
- compactionQueue.add(e);
- }
-
- void compactionRequested(final HRegion r) {
- compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
- }
-
- private void split(final HRegion region) throws IOException {
- final HRegionInfo oldRegionInfo = region.getRegionInfo();
- final HRegion[] newRegions = region.splitRegion(this);
- if (newRegions == null) {
- // Didn't need to be split
- return;
- }
-
- // When a region is split, the META table needs to updated if we're
- // splitting a 'normal' region, and the ROOT table needs to be
- // updated if we are splitting a META region.
- HTable t = null;
- if (region.getRegionInfo().isMetaTable()) {
- // We need to update the root region
- if (this.root == null) {
- this.root = new HTable(conf, ROOT_TABLE_NAME);
- }
- t = root;
- } else {
- // For normal regions we need to update the meta region
- if (meta == null) {
- meta = new HTable(conf, META_TABLE_NAME);
- }
- t = meta;
- }
- LOG.info("Updating " + t.getTableName() + " with region split info");
- // Mark old region as offline and split in META.
- // NOTE: there is no need for retry logic here. HTable does it for us.
- oldRegionInfo.setOffline(true);
- oldRegionInfo.setSplit(true);
- BatchUpdate update = new BatchUpdate(oldRegionInfo.getRegionName());
- update.put(COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
- update.put(COL_SPLITA, Writables.getBytes(newRegions[0].getRegionInfo()));
- update.put(COL_SPLITB, Writables.getBytes(newRegions[1].getRegionInfo()));
- t.commit(update);
-
- // Add new regions to META
- for (int i = 0; i < newRegions.length; i++) {
- update = new BatchUpdate(newRegions[i].getRegionName());
- update.put(COL_REGIONINFO, Writables.getBytes(
- newRegions[i].getRegionInfo()));
- t.commit(update);
- }
-
- // Now tell the master about the new regions
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reporting region split to master");
- }
- reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
- newRegions[1].getRegionInfo());
- LOG.info("region split, META updated, and report to master all" +
- " successful. Old region=" + oldRegionInfo.toString() +
- ", new regions: " + newRegions[0].toString() + ", " +
- newRegions[1].toString() + ". Split took " +
- StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
-
- // Do not serve the new regions. Let the Master assign them.
- }
-
- /** {@inheritDoc} */
- public void closing(final Text regionName) {
- startTime = System.currentTimeMillis();
- lock.writeLock().lock();
- try {
- // Remove region from regions Map and add it to the Map of retiring
- // regions.
- retiringRegions.put(regionName, onlineRegions.remove(regionName));
- if (LOG.isDebugEnabled()) {
- LOG.debug(regionName.toString() + " closing (" +
- "Adding to retiringRegions)");
- }
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- public void closed(final Text regionName) {
- lock.writeLock().lock();
- try {
- retiringRegions.remove(regionName);
- if (LOG.isDebugEnabled()) {
- LOG.debug(regionName.toString() + " closed");
- }
- } finally {
- lock.writeLock().unlock();
- }
- }
- }
-
// Cache flushing
final Flusher cacheFlusher;
- // Needed during shutdown so we send an interrupt after completion of a
- // flush, not in the midst.
- final Integer cacheFlusherLock = new Integer(0);
- /** Flush cache upon request */
- class Flusher extends Thread implements CacheFlushListener {
- private final DelayQueue<QueueEntry> flushQueue =
- new DelayQueue<QueueEntry>();
-
- private final long optionalFlushPeriod;
-
- /** constructor */
- public Flusher() {
- super();
- this.optionalFlushPeriod = conf.getLong(
- "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
-
- }
-
- /** {@inheritDoc} */
- @Override
- public void run() {
- while (!stopRequested.get()) {
- QueueEntry e = null;
- try {
- e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
- if (e == null) {
- continue;
- }
- synchronized(cacheFlusherLock) { // Don't interrupt while we're working
- if (e.getRegion().flushcache()) {
- compactSplitThread.compactionRequested(e);
- }
-
- e.setExpirationTime(System.currentTimeMillis() +
- optionalFlushPeriod);
- flushQueue.add(e);
- }
-
- // Now insure that all the active regions are in the queue
-
- Set<HRegion> regions = getRegionsToCheck();
- for (HRegion r: regions) {
- e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
- synchronized (flushQueue) {
- if (!flushQueue.contains(e)) {
- flushQueue.add(e);
- }
- }
- }
-
- // Now make sure that the queue only contains active regions
-
- synchronized (flushQueue) {
- for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext(); ) {
- e = i.next();
- if (!regions.contains(e.getRegion())) {
- i.remove();
- }
- }
- }
- } catch (InterruptedException ex) {
- continue;
-
- } catch (ConcurrentModificationException ex) {
- continue;
-
- } catch (DroppedSnapshotException ex) {
- // Cache flush can fail in a few places. If it fails in a critical
- // section, we get a DroppedSnapshotException and a replay of hlog
- // is required. Currently the only way to do this is a restart of
- // the server.
- LOG.fatal("Replay of hlog required. Forcing server restart", ex);
- if (!checkFileSystem()) {
- break;
- }
- HRegionServer.this.stop();
-
- } catch (IOException ex) {
- LOG.error("Cache flush failed" +
- (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
- RemoteExceptionHandler.checkIOException(ex));
- if (!checkFileSystem()) {
- break;
- }
-
- } catch (Exception ex) {
- LOG.error("Cache flush failed" +
- (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
- ex);
- if (!checkFileSystem()) {
- break;
- }
- }
- }
- flushQueue.clear();
- LOG.info(getName() + " exiting");
- }
-
- /** {@inheritDoc} */
- public void flushRequested(HRegion region) {
- QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
- synchronized (flushQueue) {
- if (flushQueue.contains(e)) {
- flushQueue.remove(e);
- }
- flushQueue.add(e);
- }
- }
- }
-
// HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
protected HLog log;
final LogRoller logRoller;
final Integer logRollerLock = new Integer(0);
- /** Runs periodically to determine if the HLog should be rolled */
- class LogRoller extends Thread implements LogRollListener {
- private final Integer rollLock = new Integer(0);
- private final long optionalLogRollInterval;
- private long lastLogRollTime;
- private volatile boolean rollLog;
-
- /** constructor */
- public LogRoller() {
- super();
- this.optionalLogRollInterval = conf.getLong(
- "hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L);
- this.rollLog = false;
- lastLogRollTime = System.currentTimeMillis();
- }
-
- /** {@inheritDoc} */
- @Override
- public void run() {
- while (!stopRequested.get()) {
- while (!rollLog && !stopRequested.get()) {
- long now = System.currentTimeMillis();
- if (this.lastLogRollTime + this.optionalLogRollInterval <= now) {
- rollLog = true;
- this.lastLogRollTime = now;
- } else {
- synchronized (rollLock) {
- try {
- rollLock.wait(threadWakeFrequency);
-
- } catch (InterruptedException e) {
- continue;
- }
- }
- }
- }
- if (!rollLog) {
- // There's only two reasons to break out of the while loop.
- // 1. Log roll requested
- // 2. Stop requested
- // so if a log roll was not requested, continue and break out of loop
- continue;
- }
- synchronized (logRollerLock) {
- try {
- LOG.info("Rolling hlog. Number of entries: " + log.getNumEntries());
- log.rollWriter();
-
- } catch (IOException ex) {
- LOG.error("Log rolling failed",
- RemoteExceptionHandler.checkIOException(ex));
- checkFileSystem();
-
- } catch (Exception ex) {
- LOG.error("Log rolling failed", ex);
- checkFileSystem();
-
- } finally {
- rollLog = false;
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- public void logRollRequested() {
- synchronized (rollLock) {
- rollLog = true;
- rollLock.notifyAll();
- }
- }
- }
-
/**
* Starts a HRegionServer at the default location
* @param conf
@@ -624,13 +227,13 @@
conf.getInt("hbase.master.lease.period", 30 * 1000);
// Cache flushing thread.
- this.cacheFlusher = new Flusher();
+ this.cacheFlusher = new Flusher(this);
// Compaction thread
- this.compactSplitThread = new CompactSplitThread();
+ this.compactSplitThread = new CompactSplitThread(this);
// Log rolling thread
- this.logRoller = new LogRoller();
+ this.logRoller = new LogRoller(this);
// Task thread to process requests from Master
this.worker = new Worker();
@@ -817,12 +420,8 @@
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
- synchronized(cacheFlusherLock) {
- this.cacheFlusher.interrupt();
- }
- synchronized (compactSplitLock) {
- this.compactSplitThread.interrupt();
- }
+ cacheFlusher.interruptPolitely();
+ compactSplitThread.interrupt();
synchronized (logRollerLock) {
this.logRoller.interrupt();
}
@@ -1592,9 +1191,28 @@
}
/** @return the info server */
+ /**
+ * Get the InfoServer this HRegionServer has put up.
+ */
public InfoServer getInfoServer() {
return infoServer;
}
+
+ /**
+ * Check if a stop has been requested.
+ */
+ public boolean isStopRequested() {
+ return stopRequested.get();
+ }
+
+ /** Get the write lock for the server */
+ ReentrantReadWriteLock.WriteLock getWriteLock() {
+ return lock.writeLock();
+ }
+
+ void compactionRequested(QueueEntry e) {
+ compactSplitThread.compactionRequested(e);
+ }
/**
* @return Immutable list of this servers regions.
@@ -1624,6 +1242,16 @@
return getRegion(regionName, false);
}
+ /** Move a region from online to closing. */
+ void setRegionClosing(final Text regionName) {
+ retiringRegions.put(regionName, onlineRegions.remove(regionName));
+ }
+
+ /** Set a region as closed. */
+ void setRegionClosed(final Text regionName) {
+ retiringRegions.remove(regionName);
+ }
+
/**
* Protected utility method for safely obtaining an HRegion handle.
* @param regionName Name of online {@link HRegion} to return
@@ -1633,7 +1261,7 @@
* @throws NotServingRegionException
*/
protected HRegion getRegion(final Text regionName,
- final boolean checkRetiringRegions)
+ final boolean checkRetiringRegions)
throws NotServingRegionException {
HRegion region = null;
this.lock.readLock().lock();
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=630968&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Mon Feb 25 11:50:02 2008
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+
+/** Runs periodically to determine if the HLog should be rolled */
+class LogRoller extends Thread implements LogRollListener {
+ static final Log LOG = LogFactory.getLog(LogRoller.class);
+ private final Integer rollLock = new Integer(0);
+ private final long optionalLogRollInterval;
+ private long lastLogRollTime;
+ private volatile boolean rollLog;
+ private final HRegionServer server;
+ private final HBaseConfiguration conf;
+
+ /** constructor */
+ public LogRoller(final HRegionServer server) {
+ super();
+ this.server = server;
+ conf = server.conf;
+ this.optionalLogRollInterval = conf.getLong(
+ "hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L);
+ this.rollLog = false;
+ lastLogRollTime = System.currentTimeMillis();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run() {
+ while (!server.isStopRequested()) {
+ while (!rollLog && !server.isStopRequested()) {
+ long now = System.currentTimeMillis();
+ if (this.lastLogRollTime + this.optionalLogRollInterval <= now) {
+ rollLog = true;
+ this.lastLogRollTime = now;
+ } else {
+ synchronized (rollLock) {
+ try {
+ rollLock.wait(server.threadWakeFrequency);
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+ }
+ }
+ if (!rollLog) {
+ // There's only two reasons to break out of the while loop.
+ // 1. Log roll requested
+ // 2. Stop requested
+ // so if a log roll was not requested, continue and break out of loop
+ continue;
+ }
+ synchronized (server.logRollerLock) {
+ try {
+ LOG.info("Rolling hlog. Number of entries: " + server.getLog().getNumEntries());
+ server.getLog().rollWriter();
+ } catch (IOException ex) {
+ LOG.error("Log rolling failed",
+ RemoteExceptionHandler.checkIOException(ex));
+ server.checkFileSystem();
+ } catch (Exception ex) {
+ LOG.error("Log rolling failed", ex);
+ server.checkFileSystem();
+ } finally {
+ rollLog = false;
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void logRollRequested() {
+ synchronized (rollLock) {
+ rollLog = true;
+ rollLock.notifyAll();
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java?rev=630968&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java Mon Feb 25 11:50:02 2008
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Delayed;
+
+/** Queue entry passed to flusher, compactor and splitter threads */
+class QueueEntry implements Delayed {
+ private final HRegion region;
+ private long expirationTime;
+
+ QueueEntry(HRegion region, long expirationTime) {
+ this.region = region;
+ this.expirationTime = expirationTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object o) {
+ QueueEntry other = (QueueEntry) o;
+ return this.hashCode() == other.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode() {
+ return this.region.getRegionInfo().hashCode();
+ }
+
+ /** {@inheritDoc} */
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(this.expirationTime - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ /** {@inheritDoc} */
+ public int compareTo(Delayed o) {
+ long delta = this.getDelay(TimeUnit.MILLISECONDS) -
+ o.getDelay(TimeUnit.MILLISECONDS);
+
+ int value = 0;
+ if (delta > 0) {
+ value = 1;
+
+ } else if (delta < 0) {
+ value = -1;
+ }
+ return value;
+ }
+
+ /** @return the region */
+ public HRegion getRegion() {
+ return region;
+ }
+
+ /** @param expirationTime the expirationTime to set */
+ public void setExpirationTime(long expirationTime) {
+ this.expirationTime = expirationTime;
+ }
+}
\ No newline at end of file