You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/19 14:50:17 UTC
svn commit: r1387563 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/replication/regionserver/
test/java/org/apache/hadoop/hbase/replication/ tes...
Author: mbautin
Date: Wed Sep 19 12:50:17 2012
New Revision: 1387563
URL: http://svn.apache.org/viewvc?rev=1387563&view=rev
Log:
[master] [89-fb] Remove push-style replication
Author: mbautin
Summary: Removing the "push-style" replication (pushing edits from the RS to the destination cluster) because we are using a different replication approach.
Test Plan: Unit tests
Reviewers: kannan, nspiegelberg, mycnyc, amirshim
Reviewed By: kannan
CC: Liyin, Karthik
Differential Revision: https://reviews.facebook.net/D5445
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed Sep 19 12:50:17 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
@@ -39,7 +38,6 @@ import org.apache.hadoop.hbase.client.Ro
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.master.AssignmentPlan;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.io.MapWritable;
/**
@@ -376,17 +374,6 @@ public interface HRegionInterface extend
byte[] regionName, byte[] familyName, boolean assignSeqNum) throws IOException;
/**
- * Replicates the given entries. The guarantee is that the given entries
- * will be durable on the slave cluster if this method returns without
- * any exception.
- * hbase.replication has to be set to true for this to work.
- *
- * @param entries entries to replicate
- * @throws IOException
- */
- public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
-
- /**
* Closes the specified region.
* @param hri region to be closed
* @param reportWhenCompleted whether to report to master
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Sep 19 12:50:17 2012
@@ -129,7 +129,6 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DaemonThreadFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -311,10 +310,6 @@ public class HRegionServer implements HR
private final String machineName;
- // Replication-related attributes
- private Replication replicationHandler;
- // End of replication
-
private final AtomicLong globalMemstoreSize = new AtomicLong(0);
// reference to the Thrift Server.
@@ -1276,18 +1271,14 @@ public class HRegionServer implements HR
"running at " + this.serverInfo.getServerName() +
" because logdir " + logdir.toString() + " exists");
}
- this.replicationHandler = new Replication(this.conf, this.zooKeeperWrapper,
- this.serverInfo, this.fs, oldLogDir, stopRequested);
HLog log = instantiateHLog(logdir, oldLogDir);
- this.replicationHandler.addLogEntryVisitor(log);
return log;
}
// instantiate
protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
return new HLog(this.fs, logdir, oldLogDir, this.conf, this.hlogRoller,
- this.replicationHandler.getReplicationManager(),
- this.serverInfo.getServerAddress().toString());
+ null, this.serverInfo.getServerAddress().toString());
}
@@ -1492,8 +1483,6 @@ public class HRegionServer implements HR
}
}
- this.replicationHandler.startReplicationServices();
-
if (this.server == null) {
// Start Server to handle client requests.
this.server = HBaseRPC.getServer(this,
@@ -1664,9 +1653,6 @@ public class HRegionServer implements HR
Threads.shutdown(this.cacheFlusher);
Threads.shutdown(this.hlogRoller);
this.compactSplitThread.join();
- if (replicationHandler != null) {
- this.replicationHandler.join();
- }
}
private boolean getMaster() {
@@ -3353,11 +3339,6 @@ public class HRegionServer implements HR
}
}
- @Override
- public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
- this.replicationHandler.replicateLogEntries(entries);
- }
-
/**
* Do class main.
* @param args
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Wed Sep 19 12:50:17 2012
@@ -1,162 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-
-/**
- * Replication serves as an umbrella over the setup of replication and
- * is used by HRS.
- */
-public class Replication implements LogEntryVisitor {
-
- private final boolean replication;
- private final ReplicationSourceManager replicationManager;
- private boolean replicationMaster;
- private final AtomicBoolean replicating = new AtomicBoolean(true);
- private final ReplicationZookeeperWrapper zkHelper;
- private final Configuration conf;
- private final AtomicBoolean stopRequested;
- private ReplicationSink replicationSink;
-
- /**
- * Instantiate the replication management (if rep is enabled).
- * @param conf conf to use
- * @param zkWrapper the ZooKeeperWrapper of this region server
- * @param hsi the info if this region server
- * @param fs handle to the filesystem
- * @param oldLogDir directory where logs are archived
- * @param stopRequested boolean that tells us if we are shutting down
- * @throws IOException
- */
- public Replication(Configuration conf, ZooKeeperWrapper zkWrapper,
- HServerInfo hsi, FileSystem fs, Path oldLogDir,
- AtomicBoolean stopRequested) throws IOException {
- this.conf = conf;
- this.stopRequested = stopRequested;
- this.replication =
- conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
- if (replication) {
- this.zkHelper = new ReplicationZookeeperWrapper(zkWrapper, conf,
- this.replicating, hsi.getServerName());
- this.replicationMaster = zkHelper.isReplicationMaster();
- this.replicationManager = this.replicationMaster ?
- new ReplicationSourceManager(zkHelper, conf, stopRequested,
- fs, this.replicating, oldLogDir) : null;
- } else {
- replicationManager = null;
- zkHelper = null;
- }
- }
-
- /**
- * Join with the replication threads
- */
- public void join() {
- if (this.replication) {
- if (this.replicationMaster) {
- this.replicationManager.join();
- }
- this.zkHelper.deleteOwnRSZNode();
- }
- }
-
- /**
- * Carry on the list of log entries down to the sink
- * @param entries list of entries to replicate
- * @throws IOException
- */
- public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
- if (this.replication && !this.replicationMaster) {
- this.replicationSink.replicateEntries(entries);
- }
- }
-
- /**
- * If replication is enabled and this cluster is a master,
- * it starts
- * @throws IOException
- */
- public void startReplicationServices() throws IOException {
- if (this.replication) {
- if (this.replicationMaster) {
- this.replicationManager.init();
- } else {
- this.replicationSink =
- new ReplicationSink(this.conf, this.stopRequested);
- }
- }
- }
-
- /**
- * Get the replication sources manager
- * @return the manager if replication is enabled, else returns false
- */
- public ReplicationSourceManager getReplicationManager() {
- return replicationManager;
- }
-
- @Override
- public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
- WALEdit logEdit) {
- NavigableMap<byte[], Integer> scopes =
- new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
- byte[] family;
- for (KeyValue kv : logEdit.getKeyValues()) {
- family = kv.getFamily();
- int scope = info.getTableDesc().getFamily(family).getScope();
- if (scope != HConstants.REPLICATION_SCOPE_LOCAL &&
- !scopes.containsKey(family)) {
- scopes.put(family, scope);
- }
- }
- if (!scopes.isEmpty()) {
- logEdit.setScopes(scopes);
- }
- }
-
- /**
- * Add this class as a log entry visitor for HLog if replication is enabled
- * @param hlog log that was add ourselves on
- */
- public void addLogEntryVisitor(HLog hlog) {
- if (replication) {
- hlog.addLogEntryVisitor(this);
- }
- }
-}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Sep 19 12:50:17 2012
@@ -1,648 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.hadoop.hbase.util.HasThread;
-
-/**
- * Class that handles the source of a replication stream.
- * Currently does not handle more than 1 slave
- * For each slave cluster it selects a random number of peers
- * using a replication ratio. For example, if replication ration = 0.1
- * and slave cluster has 100 region servers, 10 will be selected.
- * <p/>
- * A stream is considered down when we cannot contact a region server on the
- * peer cluster for more than 55 seconds by default.
- * <p/>
- *
- */
-public class ReplicationSource extends HasThread
- implements ReplicationSourceInterface {
-
- private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
- // Lock to manage when a HLog is changing path (archiving)
- private final ReentrantLock pathLock = new ReentrantLock();
- // Queue of logs to process
- private PriorityBlockingQueue<Path> queue;
- // container of entries to replicate
- private HLog.Entry[] entriesArray;
- private HConnection conn;
- // Helper class for zookeeper
- private ReplicationZookeeperWrapper zkHelper;
- private Configuration conf;
- // ratio of region servers to chose from a slave cluster
- private float ratio;
- private Random random;
- // should we replicate or not?
- private AtomicBoolean replicating;
- // id of the peer cluster this source replicates to
- private String peerClusterId;
- // The manager of all sources to which we ping back our progress
- private ReplicationSourceManager manager;
- // Should we stop everything?
- private AtomicBoolean stop;
- // List of chosen sinks (region servers)
- private List<HServerAddress> currentPeers;
- // How long should we sleep for each retry
- private long sleepForRetries;
- // Max size in bytes of entriesArray
- private long replicationQueueSizeCapacity;
- // Max number of entries in entriesArray
- private int replicationQueueNbCapacity;
- // Our reader for the current log
- private HLog.Reader reader;
- // Current position in the log
- private long position = 0;
- // Path of the current log
- private volatile Path currentPath;
- private FileSystem fs;
- // id of this cluster
- private byte clusterId;
- // total number of edits we replicated
- private long totalReplicatedEdits = 0;
- // The znode we currently play with
- private String peerClusterZnode;
- // Indicates if this queue is recovered (and will be deleted when depleted)
- private boolean queueRecovered;
- // Maximum number of retries before taking bold actions
- private long maxRetriesMultiplier;
- // Current number of entries that we need to replicate
- private int currentNbEntries = 0;
- // Indicates if this particular source is running
- private volatile boolean running = true;
-
- /**
- * Instantiation method used by region servers
- *
- * @param conf configuration to use
- * @param fs file system to use
- * @param manager replication manager to ping to
- * @param stopper the atomic boolean to use to stop the regionserver
- * @param replicating the atomic boolean that starts/stops replication
- * @param peerClusterZnode the name of our znode
- * @throws IOException
- */
- public void init(final Configuration conf,
- final FileSystem fs,
- final ReplicationSourceManager manager,
- final AtomicBoolean stopper,
- final AtomicBoolean replicating,
- final String peerClusterZnode)
- throws IOException {
- this.stop = stopper;
- this.conf = conf;
- this.replicationQueueSizeCapacity =
- this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
- this.replicationQueueNbCapacity =
- this.conf.getInt("replication.source.nb.capacity", 25000);
- this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
- for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
- this.entriesArray[i] = new HLog.Entry();
- }
- this.maxRetriesMultiplier =
- this.conf.getLong("replication.source.maxretriesmultiplier", 10);
- this.queue =
- new PriorityBlockingQueue<Path>(
- conf.getInt("hbase.regionserver.maxlogs", 32),
- new LogsComparator());
- this.conn = HConnectionManager.getConnection(conf);
- this.zkHelper = manager.getRepZkWrapper();
- this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
- this.currentPeers = new ArrayList<HServerAddress>();
- this.random = new Random();
- this.replicating = replicating;
- this.manager = manager;
- this.sleepForRetries =
- this.conf.getLong("replication.source.sleepforretries", 1000);
- this.fs = fs;
- this.clusterId = Byte.valueOf(zkHelper.getClusterId());
-
- // Finally look if this is a recovered queue
- this.checkIfQueueRecovered(peerClusterZnode);
- }
-
- // The passed znode will be either the id of the peer cluster or
- // the handling story of that queue in the form of id-startcode-*
- private void checkIfQueueRecovered(String peerClusterZnode) {
- String[] parts = peerClusterZnode.split("-");
- this.queueRecovered = parts.length != 1;
- this.peerClusterId = this.queueRecovered ?
- parts[0] : peerClusterZnode;
- this.peerClusterZnode = peerClusterZnode;
- }
-
- /**
- * Select a number of peers at random using the ratio. Mininum 1.
- */
- private void chooseSinks() {
- this.currentPeers.clear();
- List<HServerAddress> addresses =
- this.zkHelper.getPeersAddresses(peerClusterId);
- Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
- int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
- LOG.info("Getting " + nbPeers +
- " rs from peer cluster # " + peerClusterId);
- for (int i = 0; i < nbPeers; i++) {
- HServerAddress address;
- // Make sure we get one address that we don't already have
- do {
- address = addresses.get(this.random.nextInt(addresses.size()));
- } while (setOfAddr.contains(address));
- LOG.info("Choosing peer " + address);
- setOfAddr.add(address);
- }
- this.currentPeers.addAll(setOfAddr);
- }
-
- @Override
- public void enqueueLog(Path log) {
- this.queue.put(log);
- }
-
- @Override
- public void logArchived(Path oldPath, Path newPath) {
- // in sync with queue polling
- this.pathLock.lock();
- try {
- if (oldPath.equals(this.currentPath)) {
- this.currentPath = newPath;
- LOG.debug("Current log moved, changing currentPath to " +newPath);
- return;
- }
-
- boolean present = this.queue.remove(oldPath);
- LOG.debug("old log was " + (present ?
- "present, changing the queue" : "already processed"));
- if (present) {
- this.queue.add(newPath);
- }
- } finally {
- this.pathLock.unlock();
- }
- }
-
- @Override
- public void run() {
- connectToPeers();
- // We were stopped while looping to connect to sinks, just abort
- if (this.stop.get()) {
- return;
- }
- // If this is recovered, the queue is already full and the first log
- // normally has a position (unless the RS failed between 2 logs)
- if (this.queueRecovered) {
- this.position = this.zkHelper.getHLogRepPosition(
- this.peerClusterZnode, this.queue.peek().getName());
- }
- int sleepMultiplier = 1;
- // Loop until we close down
- while (!stop.get() && this.running) {
-
- // In sync with logArchived
- this.pathLock.lock();
- try {
- // Get a new path
- if (!getNextPath()) {
- if (sleepForRetries("No log to process", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
- // Open a reader on it
- if (!openReader(sleepMultiplier)) {
- continue;
- }
- } finally {
- this.pathLock.unlock();
- }
-
- // If we got a null reader but didn't continue, then sleep and continue
- if (this.reader == null) {
- if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
-
- boolean gotIOE = false; // TODO this is a hack for HDFS-1057
- currentNbEntries = 0;
- try {
- if(readAllEntriesToReplicateOrNextFile()) {
- continue;
- }
- } catch (IOException ioe) {
- LOG.warn(peerClusterZnode + " Got: ", ioe);
- gotIOE = true;
- if (ioe.getCause() instanceof EOFException) {
-
- boolean considerDumping = false;
- if (this.queueRecovered) {
- try {
- FileStatus stat = this.fs.getFileStatus(this.currentPath);
- if (stat.getLen() == 0) {
- LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
- }
- considerDumping = true;
- } catch (IOException e) {
- LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
- }
- } else if (currentNbEntries != 0) {
- LOG.warn(peerClusterZnode + " Got EOF while reading, " +
- "looks like this file is broken? " + currentPath);
- considerDumping = true;
- currentNbEntries = 0;
- }
-
- if (considerDumping &&
- sleepMultiplier == this.maxRetriesMultiplier &&
- processEndOfFile()) {
- continue;
- }
- }
- } finally {
- try {
- // if current path is null, it means we processEndOfFile hence
- if (this.currentPath != null && !gotIOE) {
- this.position = this.reader.getPosition();
- }
- if (this.reader != null) {
- this.reader.close();
- }
- } catch (IOException e) {
- gotIOE = true;
- LOG.warn("Unable to finalize the tailing of a file", e);
- }
- }
-
- // If we didn't get anything to replicate, or if we hit a IOE,
- // wait a bit and retry.
- // But if we need to stop, don't bother sleeping
- if (!stop.get() && (gotIOE || currentNbEntries == 0)) {
- if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
- sleepMultiplier = 1;
- shipEdits();
-
- }
- LOG.debug("Source exiting " + peerClusterId);
- }
-
- /**
- * Read all the entries from the current log files and retain those
- * that need to be replicated. Else, process the end of the current file.
- * @return true if we got nothing and went to the next file, false if we got
- * entries
- * @throws IOException
- */
- protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
- long seenEntries = 0;
- if (this.position != 0) {
- this.reader.seek(this.position);
- }
- HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
- while (entry != null) {
- WALEdit edit = entry.getEdit();
- seenEntries++;
- // Remove all KVs that should not be replicated
- removeNonReplicableEdits(edit);
- HLogKey logKey = entry.getKey();
- // Don't replicate catalog entries, if the WALEdit wasn't
- // containing anything to replicate and if we're currently not set to replicate
- if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
- Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
- edit.size() != 0 && replicating.get()) {
- logKey.setClusterId(this.clusterId);
- currentNbEntries++;
- }
- // Stop if too many entries or too big
- if ((this.reader.getPosition() - this.position)
- >= this.replicationQueueSizeCapacity ||
- currentNbEntries >= this.replicationQueueNbCapacity) {
- break;
- }
- entry = this.reader.next(entriesArray[currentNbEntries]);
- }
- LOG.debug("currentNbEntries:" + currentNbEntries +
- " and seenEntries:" + seenEntries +
- " and size: " + (this.reader.getPosition() - this.position));
- // If we didn't get anything and the queue has an object, it means we
- // hit the end of the file for sure
- return seenEntries == 0 && processEndOfFile();
- }
-
- private void connectToPeers() {
- // Connect to peer cluster first, unless we have to stop
- while (!this.stop.get() && this.currentPeers.size() == 0) {
- try {
- chooseSinks();
- Thread.sleep(this.sleepForRetries);
- } catch (InterruptedException e) {
- LOG.error("Interrupted while trying to connect to sinks", e);
- }
- }
- }
-
- /**
- * Poll for the next path
- * @return true if a path was obtained, false if not
- */
- protected boolean getNextPath() {
- try {
- if (this.currentPath == null) {
- this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while reading edits", e);
- }
- return this.currentPath != null;
- }
-
- /**
- * Open a reader on the current path
- *
- * @param sleepMultiplier by how many times the default sleeping time is augmented
- * @return true if we should continue with that file, false if we are over with it
- */
- protected boolean openReader(int sleepMultiplier) {
- try {
- LOG.info("Opening log for replication " + this.currentPath.getName() + " at " + this.position);
- this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
- } catch (IOException ioe) {
- this.reader = null;
- LOG.warn(peerClusterZnode + " Got: ", ioe);
- // TODO Need a better way to determinate if a file is really gone but
- // TODO without scanning all logs dir
- if (sleepMultiplier == this.maxRetriesMultiplier) {
- LOG.warn("Waited too long for this file, considering dumping");
- return !processEndOfFile();
- }
- }
- return true;
- }
-
- /**
- * Do the sleeping logic
- * @param msg Why we sleep
- * @param sleepMultiplier by how many times the default sleeping time is augmented
- * @return
- */
- protected boolean sleepForRetries(String msg, int sleepMultiplier) {
- try {
- LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
- Thread.sleep(this.sleepForRetries * sleepMultiplier);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping between retries");
- }
- return sleepMultiplier < maxRetriesMultiplier;
- }
-
- /**
- * We only want KVs that are scoped other than local
- * @param edit The KV to check for replication
- */
- protected void removeNonReplicableEdits(WALEdit edit) {
- NavigableMap<byte[], Integer> scopes = edit.getScopes();
- List<KeyValue> kvs = edit.getKeyValues();
- for (int i = 0; i < edit.size(); i++) {
- KeyValue kv = kvs.get(i);
- // The scope will be null or empty if
- // there's nothing to replicate in that WALEdit
- if (scopes == null || !scopes.containsKey(kv.getFamily())) {
- kvs.remove(i);
- i--;
- }
- }
- }
-
- /**
- * Do the shipping logic
- */
- protected void shipEdits() {
- int sleepMultiplier = 1;
- while (!stop.get()) {
- try {
- HRegionInterface rrs = getRS();
- LOG.debug("Replicating " + currentNbEntries);
- rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
- this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered);
- this.totalReplicatedEdits += currentNbEntries;
- LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
- break;
-
- } catch (IOException ioe) {
- LOG.warn("Unable to replicate because ", ioe);
- try {
- boolean down;
- do {
- down = isSlaveDown();
- if (down) {
- LOG.debug("The region server we tried to ping didn't answer, " +
- "sleeping " + sleepForRetries + " times " + sleepMultiplier);
- Thread.sleep(this.sleepForRetries * sleepMultiplier);
- if (sleepMultiplier < maxRetriesMultiplier) {
- sleepMultiplier++;
- } else {
- chooseSinks();
- }
- }
- } while (!stop.get() && down);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while trying to contact the peer cluster");
- }
-
- }
- }
- }
-
- /**
- * If the queue isn't empty, switch to the next one
- * Else if this is a recovered queue, it means we're done!
- * Else we'll just continue to try reading the log file
- * @return true if we're done with the current file, false if we should
- * continue trying to read from it
- */
- protected boolean processEndOfFile() {
- this.pathLock.lock();
- try {
- if (this.queue.size() != 0) {
- this.currentPath = null;
- this.position = 0;
- return true;
- } else if (this.queueRecovered) {
- this.manager.closeRecoveredQueue(this);
- this.abort();
- return true;
- }
- } finally {
- this.pathLock.unlock();
- }
- return false;
- }
-
- public void startup() {
- String n = Thread.currentThread().getName();
- Thread.UncaughtExceptionHandler handler =
- new Thread.UncaughtExceptionHandler() {
- public void uncaughtException(final Thread t, final Throwable e) {
- LOG.fatal("Set stop flag in " + t.getName(), e);
- abort();
- }
- };
- Threads.setDaemonThreadRunning(
- this, n + ".replicationSource," + clusterId, handler);
- }
-
- /**
- * Hastily stop the replication, then wait for shutdown
- */
- private void abort() {
- LOG.info("abort");
- this.running = false;
- terminate();
- }
-
- public void terminate() {
- LOG.info("terminate");
- Threads.shutdown(this, this.sleepForRetries);
- }
-
- /**
- * Get a new region server at random from this peer
- * @return
- * @throws IOException
- */
- private HRegionInterface getRS() throws IOException {
- if (this.currentPeers.size() == 0) {
- throw new IOException(this.peerClusterZnode + " has 0 region servers");
- }
- HServerAddress address =
- currentPeers.get(random.nextInt(this.currentPeers.size()));
- return this.conn.getHRegionConnection(address);
- }
-
- /**
- * Check if the slave is down by trying to establish a connection
- * @return true if down, false if up
- * @throws InterruptedException
- */
- public boolean isSlaveDown() throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- Thread pingThread = new Thread() {
- public void run() {
- try {
- HRegionInterface rrs = getRS();
- // Dummy call which should fail
- rrs.getHServerInfo();
- latch.countDown();
- } catch (IOException ex) {
- LOG.info("Slave cluster looks down: " + ex.getMessage());
- }
- }
- };
- pingThread.start();
- // awaits returns true if countDown happened
- boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
- pingThread.interrupt();
- return down;
- }
-
- /**
- * Get the id that the source is replicating to
- *
- * @return peer cluster id
- */
- public String getPeerClusterZnode() {
- return this.peerClusterZnode;
- }
-
- /**
- * Get the path of the current HLog
- * @return current hlog's path
- */
- public Path getCurrentPath() {
- return this.currentPath;
- }
-
- /**
- * Comparator used to compare logs together based on their start time
- */
- public static class LogsComparator implements Comparator<Path> {
-
- @Override
- public int compare(Path o1, Path o2) {
- return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
- }
-
- @Override
- public boolean equals(Object o) {
- return true;
- }
-
- /**
- * Split a path to get the start time
- * For example: 10.20.20.171%3A60020.1277499063250
- * @param p path to split
- * @return start time
- */
- private long getTS(Path p) {
- String[] parts = p.getName().split("\\.");
- return Long.parseLong(parts[parts.length-1]);
- }
- }
-
-}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Wed Sep 19 12:50:17 2012
@@ -1,87 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Interface that defines a replication source
- */
-public interface ReplicationSourceInterface {
-
- /**
- * Initializer for the source
- * @param conf the configuration to use
- * @param fs the file system to use
- * @param manager the manager to use
- * @param stopper the stopper object for this region server
- * @param replicating the status of the replication on this cluster
- * @param peerClusterId the id of the peer cluster
- * @throws IOException
- */
- public void init(final Configuration conf,
- final FileSystem fs,
- final ReplicationSourceManager manager,
- final AtomicBoolean stopper,
- final AtomicBoolean replicating,
- final String peerClusterId) throws IOException;
-
- /**
- * Add a log to the list of logs to replicate
- * @param log path to the log to replicate
- */
- public void enqueueLog(Path log);
-
- /**
- * Get the current log that's replicated
- * @return the current log
- */
- public Path getCurrentPath();
-
- /**
- * Start the replication
- */
- public void startup();
-
- /**
- * End the replication
- */
- public void terminate();
-
- /**
- * Get the id that the source is replicating to
- *
- * @return peer cluster id
- */
- public String getPeerClusterZnode();
-
- /**
- * Notify this source that a log was archived
- * @param oldPath old path of the log
- * @param newPath new path of the log (archive)
- */
- public void logArchived(Path oldPath, Path newPath);
-}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Wed Sep 19 12:50:17 2012
@@ -1,353 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * This class is responsible to manage all the replication
- * sources. There are two classes of sources:
- * <li> Normal sources are persistent and one per peer cluster</li>
- * <li> Old sources are recovered from a failed region server and our
- * only goal is to finish replicating the HLog queue it had up in ZK</li>
- *
- * When a region server dies, this class uses a watcher to get notified and it
- * tries to grab a lock in order to transfer all the queues in a local
- * old source.
- */
-public class ReplicationSourceManager implements LogActionsListener {
-
- private static final Log LOG =
- LogFactory.getLog(ReplicationSourceManager.class);
- // List of all the sources that read this RS's logs
- private final List<ReplicationSourceInterface> sources;
- // List of all the sources we got from died RSs
- private final List<ReplicationSourceInterface> oldsources;
- // Indicates if we are currently replicating
- private final AtomicBoolean replicating;
- // Helper for zookeeper
- private final ReplicationZookeeperWrapper zkHelper;
- // Indicates if the region server is closing
- private final AtomicBoolean stopper;
- // All logs we are currently trackign
- private final SortedSet<String> hlogs;
- private final Configuration conf;
- private final FileSystem fs;
- // The path to the latest log we saw, for new coming sources
- private Path latestPath;
- // List of all the other region servers in this cluster
- private final List<String> otherRegionServers;
- // Path to the hlog archive
- private final Path oldLogDir;
-
- /**
- * Creates a replication manager and sets the watch on all the other
- * registered region servers
- * @param zkHelper the zk helper for replication
- * @param conf the configuration to use
- * @param stopper the stopper object for this region server
- * @param fs the file system to use
- * @param replicating the status of the replication on this cluster
- * @param oldLogDir the directory where old logs are archived
- */
- public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
- final Configuration conf,
- final AtomicBoolean stopper,
- final FileSystem fs,
- final AtomicBoolean replicating,
- final Path oldLogDir) {
- this.sources = new ArrayList<ReplicationSourceInterface>();
- this.replicating = replicating;
- this.zkHelper = zkHelper;
- this.stopper = stopper;
- this.hlogs = new TreeSet<String>();
- this.oldsources = new ArrayList<ReplicationSourceInterface>();
- this.conf = conf;
- this.fs = fs;
- this.oldLogDir = oldLogDir;
- List<String> otherRSs =
- this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
- this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
- }
-
- /**
- * Provide the id of the peer and a log key and this method will figure which
- * hlog it belongs to and will log, for this region server, the current
- * position. It will also clean old logs from the queue.
- * @param log Path to the log currently being replicated from
- * replication status in zookeeper. It will also delete older entries.
- * @param id id of the peer cluster
- * @param position current location in the log
- * @param queueRecovered indicates if this queue comes from another region server
- */
- public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
- String key = log.getName();
- LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
- this.zkHelper.writeReplicationStatus(key.toString(), id, position);
- synchronized (this.hlogs) {
- if (!queueRecovered && this.hlogs.first() != key) {
- SortedSet<String> hlogSet = this.hlogs.headSet(key);
- LOG.info("Removing " + hlogSet.size() +
- " logs in the list: " + hlogSet);
- for (String hlog : hlogSet) {
- this.zkHelper.removeLogFromList(hlog.toString(), id);
- }
- hlogSet.clear();
- }
- }
- }
-
- /**
- * Adds a normal source per registered peer cluster and tries to process all
- * old region server hlog queues
- */
- public void init() throws IOException {
- for (String id : this.zkHelper.getPeerClusters().keySet()) {
- ReplicationSourceInterface src = addSource(id);
- src.startup();
- }
- List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
- synchronized (otherRegionServers) {
- LOG.info("Current list of replicators: " + currentReplicators
- + " other RSs: " + otherRegionServers);
- }
- // Look if there's anything to process after a restart
- for (String rs : currentReplicators) {
- synchronized (otherRegionServers) {
- if (!this.otherRegionServers.contains(rs)) {
- transferQueues(rs);
- }
- }
- }
- }
-
- /**
- * Add a new normal source to this region server
- * @param id the id of the peer cluster
- * @return the created source
- * @throws IOException
- */
- public ReplicationSourceInterface addSource(String id) throws IOException {
- ReplicationSourceInterface src =
- getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
- this.sources.add(src);
- synchronized (this.hlogs) {
- if (this.hlogs.size() > 0) {
- this.zkHelper.addLogToList(this.hlogs.first(),
- this.sources.get(0).getPeerClusterZnode());
- src.enqueueLog(this.latestPath);
- }
- }
- return src;
- }
-
- /**
- * Terminate the replication on this region server
- */
- public void join() {
- if (this.sources.size() == 0) {
- this.zkHelper.deleteOwnRSZNode();
- }
- for (ReplicationSourceInterface source : this.sources) {
- source.terminate();
- }
- }
-
- /**
- * Get a copy of the hlogs of the first source on this rs
- * @return a sorted set of hlog names
- */
- protected SortedSet<String> getHLogs() {
- return new TreeSet(this.hlogs);
- }
-
- /**
- * Get a list of all the normal sources of this rs
- * @return lis of all sources
- */
- public List<ReplicationSourceInterface> getSources() {
- return this.sources;
- }
-
- @Override
- public void logRolled(Path newLog) {
- if (this.sources.size() > 0) {
- this.zkHelper.addLogToList(newLog.getName(),
- this.sources.get(0).getPeerClusterZnode());
- }
- synchronized (this.hlogs) {
- this.hlogs.add(newLog.getName());
- }
- this.latestPath = newLog;
- for (ReplicationSourceInterface source : this.sources) {
- source.enqueueLog(newLog);
- }
- }
-
- @Override
- public void logArchived(Path oldPath, Path newPath) {
- for (ReplicationSourceInterface source : this.sources) {
- source.logArchived(oldPath, newPath);
- }
- }
-
- /**
- * Get the ZK help of this manager
- * @return the helper
- */
- public ReplicationZookeeperWrapper getRepZkWrapper() {
- return zkHelper;
- }
-
- /**
- * Factory method to create a replication source
- * @param conf the configuration to use
- * @param fs the file system to use
- * @param manager the manager to use
- * @param stopper the stopper object for this region server
- * @param replicating the status of the replication on this cluster
- * @param peerClusterId the id of the peer cluster
- * @return the created source
- * @throws IOException
- */
- public ReplicationSourceInterface getReplicationSource(
- final Configuration conf,
- final FileSystem fs,
- final ReplicationSourceManager manager,
- final AtomicBoolean stopper,
- final AtomicBoolean replicating,
- final String peerClusterId) throws IOException {
- ReplicationSourceInterface src;
- try {
- Class c = Class.forName(conf.get("replication.replicationsource.implementation",
- ReplicationSource.class.getCanonicalName()));
- src = (ReplicationSourceInterface) c.newInstance();
- } catch (Exception e) {
- LOG.warn("Passed replication source implemention throws errors, " +
- "defaulting to ReplicationSource", e);
- src = new ReplicationSource();
-
- }
- src.init(conf, fs, manager, stopper, replicating, peerClusterId);
- return src;
- }
-
- /**
- * Transfer all the queues of the specified to this region server.
- * First it tries to grab a lock and if it works it will move the
- * znodes and finally will delete the old znodes.
- *
- * It creates one old source for any type of source of the old rs.
- * @param rsZnode
- */
- public void transferQueues(String rsZnode) {
- // We try to lock that rs' queue directory
- if (this.stopper.get()) {
- LOG.info("Not transferring queue since we are shutting down");
- return;
- }
- if (!this.zkHelper.lockOtherRS(rsZnode)) {
- return;
- }
- LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
- SortedMap<String, SortedSet<String>> newQueues =
- this.zkHelper.copyQueuesFromRS(rsZnode);
- if (newQueues == null || newQueues.size() == 0) {
- return;
- }
- this.zkHelper.deleteRsQueues(rsZnode);
-
- for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
- String peerId = entry.getKey();
- try {
- ReplicationSourceInterface src = getReplicationSource(this.conf,
- this.fs, this, this.stopper, this.replicating, peerId);
- this.oldsources.add(src);
- for (String hlog : entry.getValue()) {
- src.enqueueLog(new Path(this.oldLogDir, hlog));
- }
- src.startup();
- } catch (IOException e) {
- // TODO manage it
- LOG.error("Failed creating a source", e);
- }
- }
- }
-
- /**
- * Clear the references to the specified old source
- * @param src source to clear
- */
- public void closeRecoveredQueue(ReplicationSourceInterface src) {
- LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
- this.oldsources.remove(src);
- this.zkHelper.deleteSource(src.getPeerClusterZnode());
- }
-
- /**
- * Watcher used to be notified of the other region server's death
- * in the local cluster. It initiates the process to transfer the queues
- * if it is able to grab the lock.
- */
- public class OtherRegionServerWatcher implements Watcher {
- @Override
- public void process(WatchedEvent watchedEvent) {
- LOG.info(" event " + watchedEvent);
- if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
- watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
- return;
- }
-
- List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
- if (newRsList == null) {
- return;
- } else {
- synchronized (otherRegionServers) {
- otherRegionServers.clear();
- otherRegionServers.addAll(newRsList);
- }
- }
- if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
- LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
- String[] rsZnodeParts = watchedEvent.getPath().split("/");
- transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
- }
- }
- }
-
-}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java Wed Sep 19 12:50:17 2012
@@ -1,77 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Source that does nothing at all, helpful to test ReplicationSourceManager
- */
-public class ReplicationSourceDummy implements ReplicationSourceInterface {
-
- ReplicationSourceManager manager;
- String peerClusterId;
- Path currentPath;
-
- @Override
- public void init(Configuration conf, FileSystem fs,
- ReplicationSourceManager manager, AtomicBoolean stopper,
- AtomicBoolean replicating, String peerClusterId)
- throws IOException {
- this.manager = manager;
- this.peerClusterId = peerClusterId;
- }
-
- @Override
- public void enqueueLog(Path log) {
- this.currentPath = log;
- }
-
- @Override
- public Path getCurrentPath() {
- return this.currentPath;
- }
-
- @Override
- public void startup() {
-
- }
-
- @Override
- public void terminate() {
-
- }
-
- @Override
- public String getPeerClusterZnode() {
- return peerClusterId;
- }
-
- @Override
- public void logArchived(Path oldPath, Path newPath) {
- }
-}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Wed Sep 19 12:50:17 2012
@@ -1,472 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestReplication {
-
- private static final Log LOG = LogFactory.getLog(TestReplication.class);
-
- private static Configuration conf1;
- private static Configuration conf2;
-
- private static ZooKeeperWrapper zkw1;
- private static ZooKeeperWrapper zkw2;
-
- private static HTable htable1;
- private static HTable htable2;
-
- private static HBaseTestingUtility utility1;
- private static HBaseTestingUtility utility2;
- private static final int NB_ROWS_IN_BATCH = 100;
- private static final long SLEEP_TIME = 500;
- private static final int NB_RETRIES = 10;
-
- private static final byte[] tableName = Bytes.toBytes("test");
- private static final byte[] famName = Bytes.toBytes("f");
- private static final byte[] row = Bytes.toBytes("row");
- private static final byte[] noRepfamName = Bytes.toBytes("norep");
-
- /**
- * @throws java.lang.Exception
- */
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- conf1 = HBaseConfiguration.create();
- conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
- // smaller block size and capacity to trigger more operations
- // and test them
- conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
- conf1.setInt("replication.source.nb.capacity", 5);
- conf1.setLong("replication.source.sleepforretries", 100);
- conf1.setInt("hbase.regionserver.maxlogs", 10);
- conf1.setLong("hbase.master.logcleaner.ttl", 10);
- conf1.setLong("hbase.client.retries.number", 4);
- conf1.setInt(HConstants.ZOOKEEPER_CONNECTION_RETRY_NUM, 2);
- conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
- conf1.setBoolean("dfs.support.append", true);
- conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-
- utility1 = new HBaseTestingUtility(conf1);
- utility1.startMiniZKCluster();
- MiniZooKeeperCluster miniZK = utility1.getZkCluster();
- zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1");
- zkw1.writeZNode("/1", "replication", "");
- zkw1.writeZNode("/1/replication", "master",
- conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf1.get(HConstants.ZOOKEEPER_CLIENT_PORT)+":/1");
- setIsReplication(true);
-
- LOG.info("Setup first Zk");
-
- conf2 = HBaseConfiguration.create();
- conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
- conf2.setInt("hbase.client.retries.number", 6);
- conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
- conf2.setBoolean("dfs.support.append", true);
- conf2.setInt(HConstants.ZOOKEEPER_CONNECTION_RETRY_NUM, 2);
- utility2 = new HBaseTestingUtility(conf2);
- utility2.setZkCluster(miniZK);
- zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2");
- zkw2.writeZNode("/2", "replication", "");
- zkw2.writeZNode("/2/replication", "master",
- conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf1.get(HConstants.ZOOKEEPER_CLIENT_PORT)+":/1");
-
- zkw1.writeZNode("/1/replication/peers", "1",
- conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf2.get(HConstants.ZOOKEEPER_CLIENT_PORT)+":/2");
-
- LOG.info("Setup second Zk");
-
- utility1.startMiniCluster(2);
- utility2.startMiniCluster(2);
-
- HTableDescriptor table = new HTableDescriptor(tableName);
- table.setDeferredLogFlush(false);
- HColumnDescriptor fam = new HColumnDescriptor(famName);
- fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
- table.addFamily(fam);
- fam = new HColumnDescriptor(noRepfamName);
- table.addFamily(fam);
- HBaseAdmin admin1 = new HBaseAdmin(conf1);
- HBaseAdmin admin2 = new HBaseAdmin(conf2);
- admin1.createTable(table);
- admin2.createTable(table);
-
- htable1 = new HTable(conf1, tableName);
- htable1.setWriteBufferSize(1024*5);
- htable2 = new HTable(conf2, tableName);
- }
-
- private static void setIsReplication(boolean rep) throws Exception {
- LOG.info("Set rep " + rep);
- zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep));
- // Takes some ms for ZK to fire the watcher
- Thread.sleep(SLEEP_TIME);
- }
-
- /**
- * @throws java.lang.Exception
- */
- @Before
- public void setUp() throws Exception {
- setIsReplication(false);
- utility1.truncateTable(tableName);
- utility2.truncateTable(tableName);
- // If test is flaky, set that sleep higher
- Thread.sleep(SLEEP_TIME*8);
- setIsReplication(true);
- }
-
- /**
- * @throws java.lang.Exception
- */
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- utility2.shutdownMiniCluster();
- utility1.shutdownMiniCluster();
- }
-
- /**
- * Add a row, check it's replicated, delete it, check's gone
- * @throws Exception
- */
- @Test
- public void testSimplePutDelete() throws Exception {
- LOG.info("testSimplePutDelete");
- Put put = new Put(row);
- put.add(famName, row, row);
-
- htable1 = new HTable(conf1, tableName);
- htable1.put(put);
-
- HTable table2 = new HTable(conf2, tableName);
- Get get = new Get(row);
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i==NB_RETRIES-1) {
- fail("Waited too much time for put replication");
- }
- Result res = table2.get(get);
- if (res.size() == 0) {
- LOG.info("Row not available");
- Thread.sleep(SLEEP_TIME);
- } else {
- assertArrayEquals(res.value(), row);
- break;
- }
- }
-
- Delete del = new Delete(row);
- htable1.delete(del);
-
- table2 = new HTable(conf2, tableName);
- get = new Get(row);
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i==NB_RETRIES-1) {
- fail("Waited too much time for del replication");
- }
- Result res = table2.get(get);
- if (res.size() >= 1) {
- LOG.info("Row not deleted");
- Thread.sleep(SLEEP_TIME);
- } else {
- break;
- }
- }
- }
-
- /**
- * Try a small batch upload using the write buffer, check it's replicated
- * @throws Exception
- */
- @Test
- public void testSmallBatch() throws Exception {
- LOG.info("testSmallBatch");
- Put put;
- // normal Batch tests
- htable1.setAutoFlush(false);
- for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
- put = new Put(Bytes.toBytes(i));
- put.add(famName, row, row);
- htable1.put(put);
- }
- htable1.flushCommits();
-
- Scan scan = new Scan();
-
- ResultScanner scanner1 = htable1.getScanner(scan);
- Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
- scanner1.close();
- assertEquals(NB_ROWS_IN_BATCH, res1.length);
-
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i==NB_RETRIES-1) {
- fail("Waited too much time for normal batch replication");
- }
- ResultScanner scanner = htable2.getScanner(scan);
- Result[] res = scanner.next(NB_ROWS_IN_BATCH);
- scanner.close();
- if (res.length != NB_ROWS_IN_BATCH) {
- LOG.info("Only got " + res.length + " rows");
- Thread.sleep(SLEEP_TIME);
- } else {
- break;
- }
- }
-
- htable1.setAutoFlush(true);
-
- }
-
- /**
- * Test stopping replication, trying to insert, make sure nothing's
- * replicated, enable it, try replicating and it should work
- * @throws Exception
- */
- @Test
- public void testStartStop() throws Exception {
-
- // Test stopping replication
- setIsReplication(false);
-
- Put put = new Put(Bytes.toBytes("stop start"));
- put.add(famName, row, row);
- htable1.put(put);
-
- Get get = new Get(Bytes.toBytes("stop start"));
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i==NB_RETRIES-1) {
- break;
- }
- Result res = htable2.get(get);
- if(res.size() >= 1) {
- fail("Replication wasn't stopped");
-
- } else {
- LOG.info("Row not replicated, let's wait a bit more...");
- Thread.sleep(SLEEP_TIME);
- }
- }
-
- // Test restart replication
- setIsReplication(true);
-
- htable1.put(put);
-
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i==NB_RETRIES-1) {
- fail("Waited too much time for put replication");
- }
- Result res = htable2.get(get);
- if(res.size() == 0) {
- LOG.info("Row not available");
- Thread.sleep(SLEEP_TIME);
- } else {
- assertArrayEquals(res.value(), row);
- break;
- }
- }
-
- put = new Put(Bytes.toBytes("do not rep"));
- put.add(noRepfamName, row, row);
- htable1.put(put);
-
- get = new Get(Bytes.toBytes("do not rep"));
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i == NB_RETRIES-1) {
- break;
- }
- Result res = htable2.get(get);
- if (res.size() >= 1) {
- fail("Not supposed to be replicated");
- } else {
- LOG.info("Row not replicated, let's wait a bit more...");
- Thread.sleep(SLEEP_TIME);
- }
- }
-
- }
-
- /**
- * Do a more intense version testSmallBatch, one that will trigger
- * hlog rolling and other non-trivial code paths
- * @throws Exception
- */
- @Test
- public void loadTesting() throws Exception {
- htable1.setWriteBufferSize(1024);
- htable1.setAutoFlush(false);
- for (int i = 0; i < NB_ROWS_IN_BATCH *10; i++) {
- Put put = new Put(Bytes.toBytes(i));
- put.add(famName, row, row);
- htable1.put(put);
- }
- htable1.flushCommits();
-
- Scan scan = new Scan();
-
- ResultScanner scanner = htable1.getScanner(scan);
- Result[] res = scanner.next(NB_ROWS_IN_BATCH * 100);
- scanner.close();
-
- assertEquals(NB_ROWS_IN_BATCH *10, res.length);
-
- scan = new Scan();
-
- for (int i = 0; i < NB_RETRIES; i++) {
-
- scanner = htable2.getScanner(scan);
- res = scanner.next(NB_ROWS_IN_BATCH * 100);
- scanner.close();
- if (res.length != NB_ROWS_IN_BATCH *10) {
- if (i == NB_RETRIES-1) {
- int lastRow = -1;
- for (Result result : res) {
- int currentRow = Bytes.toInt(result.getRow());
- for (int row = lastRow+1; row < currentRow; row++) {
- LOG.error("Row missing: " + row);
- }
- lastRow = currentRow;
- }
- LOG.error("Last row: " + lastRow);
- fail("Waited too much time for normal batch replication, "
- + res.length + " instead of " + NB_ROWS_IN_BATCH *10);
- } else {
- LOG.info("Only got " + res.length + " rows");
- Thread.sleep(SLEEP_TIME);
- }
- } else {
- break;
- }
- }
- }
-
- /**
- * Load up multiple tables over 2 region servers and kill a source during
- * the upload. The failover happens internally.
- * @throws Exception
- */
- @Test (timeout=300000)
- public void queueFailover() throws Exception {
- utility1.createMultiRegions(htable1, famName);
-
- // killing the RS with .META. can result into failed puts until we solve
- // IO fencing
- int rsToKill1 =
- utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
- int rsToKill2 =
- utility2.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
-
- // Takes about 20 secs to run the full loading, kill around the middle
- Thread killer1 = killARegionServer(utility1, 5000, rsToKill1);
- Thread killer2 = killARegionServer(utility2, 5000, rsToKill2);
-
- LOG.info("Start loading table");
- int initialCount = utility1.loadTable(htable1, famName);
- LOG.info("Done loading table");
- killer1.join(5000);
- killer2.join(5000);
- LOG.info("Done waiting for threads");
-
- Result[] res = null;
- while (true) {
- try {
- Scan scan = new Scan();
- ResultScanner scanner = htable1.getScanner(scan);
- res = scanner.next(initialCount);
- scanner.close();
- break;
- } catch (UnknownScannerException ex) {
- LOG.info("Cluster wasn't ready yet, restarting scanner");
- }
- }
-
- // Test we actually have all the rows, we may miss some because we
- // don't have IO fencing.
- if (res.length != initialCount) {
- LOG.warn("We lost some rows on the master cluster!");
- // We don't really expect the other cluster to have more rows
- initialCount = res.length;
- }
-
- Scan scan2 = new Scan();
-
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i==NB_RETRIES-1) {
- fail("Waited too much time for queueFailover replication");
- }
- ResultScanner scanner2 = htable2.getScanner(scan2);
- Result[] res2 = scanner2.next(initialCount * 2);
- scanner2.close();
- if (res2.length < initialCount) {
- LOG.info("Only got " + res2.length + " rows instead of " +
- initialCount + " current i=" + i);
- Thread.sleep(SLEEP_TIME * 2);
- } else {
- break;
- }
- }
- }
-
- private static Thread killARegionServer(final HBaseTestingUtility utility,
- final long timeout, final int rs) {
- Thread killer = new Thread() {
- public void run() {
- try {
- Thread.sleep(timeout);
- utility.expireRegionServerSession(rs);
- } catch (Exception e) {
- LOG.error(e);
- }
- }
- };
- killer.start();
- return killer;
- }
-}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java Wed Sep 19 12:50:17 2012
@@ -1,111 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-public class TestReplicationSource {
-
- private static final Log LOG =
- LogFactory.getLog(TestReplicationSource.class);
- private final static HBaseTestingUtility TEST_UTIL =
- new HBaseTestingUtility();
- private static FileSystem fs;
- private static Path oldLogDir;
- private static Path logDir;
- private static Configuration conf = HBaseConfiguration.create();
-
- /**
- * @throws java.lang.Exception
- */
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- TEST_UTIL.startMiniDFSCluster(1);
- fs = TEST_UTIL.getDFSCluster().getFileSystem();
-
- oldLogDir = TEST_UTIL.getTestDir(HConstants.HREGION_OLDLOGDIR_NAME);
- fs.mkdirs(oldLogDir);
- logDir = TEST_UTIL.getTestDir(HConstants.HREGION_LOGDIR_NAME);
- fs.mkdirs(logDir);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- fs.delete(oldLogDir, true);
- fs.delete(logDir, true);
- }
-
- /**
- * Sanity check that we can move logs around while we are reading
- * from them. Should this test fail, ReplicationSource would have a hard
- * time reading logs that are being archived.
- * @throws Exception
- */
- @Test
- public void testLogMoving() throws Exception{
- Path logPath = new Path(logDir, "log");
- HLog.Writer writer = HLog.createWriter(fs, logPath, conf);
- for(int i = 0; i < 3; i++) {
- byte[] b = Bytes.toBytes(Integer.toString(i));
- KeyValue kv = new KeyValue(b,b,b);
- WALEdit edit = new WALEdit();
- edit.add(kv);
- HLogKey key = new HLogKey(b, b, 0, 0);
- writer.append(new HLog.Entry(key, edit));
- writer.sync();
- }
- writer.close();
-
- HLog.Reader reader = HLog.getReader(fs, logPath, conf);
- HLog.Entry entry = reader.next();
- assertNotNull(entry);
-
- Path oldLogPath = new Path(oldLogDir, "log");
- fs.rename(logPath, oldLogPath);
-
- entry = reader.next();
- assertNotNull(entry);
-
- entry = reader.next();
- entry = reader.next();
-
- assertNull(entry);
-
- }
-}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Wed Sep 19 12:50:17 2012
@@ -1,209 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.net.URLEncoder;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestReplicationSourceManager {
-
- private static final Log LOG =
- LogFactory.getLog(TestReplicationSourceManager.class);
-
- private static Configuration conf;
-
- private static HBaseTestingUtility utility;
-
- private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
-
- private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
-
- private static ReplicationSourceManager manager;
-
- private static ZooKeeperWrapper zkw;
-
- private static HTableDescriptor htd;
-
- private static HRegionInfo hri;
-
- private static final byte[] r1 = Bytes.toBytes("r1");
-
- private static final byte[] r2 = Bytes.toBytes("r2");
-
- private static final byte[] f1 = Bytes.toBytes("f1");
-
- private static final byte[] test = Bytes.toBytes("test");
-
- private static FileSystem fs;
-
- private static Path oldLogDir;
-
- private static Path logDir;
-
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
-
- conf = HBaseConfiguration.create();
- conf.set("replication.replicationsource.implementation",
- ReplicationSourceDummy.class.getCanonicalName());
- conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
- utility = new HBaseTestingUtility(conf);
- utility.startMiniZKCluster();
-
- zkw = ZooKeeperWrapper.createInstance(conf, "test");
- zkw.writeZNode("/hbase", "replication", "");
- final String clusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
- + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1";
- zkw.writeZNode("/hbase/replication", "master", clusterKey);
- zkw.writeZNode("/hbase/replication/peers", "1", clusterKey);
-
- // set port to 0 so that RS picks a port dynamically to
- // avoid clash over the default port in unit test runs.
- conf.set(HConstants.REGIONSERVER_PORT, "0");
-
- HRegionServer server = new HRegionServer(conf);
- ReplicationZookeeperWrapper helper = new ReplicationZookeeperWrapper(
- server.getZooKeeperWrapper(), conf,
- REPLICATING, "123456789");
- fs = FileSystem.get(conf);
- oldLogDir = new Path(utility.getTestDir(),
- HConstants.HREGION_OLDLOGDIR_NAME);
- logDir = new Path(utility.getTestDir(),
- HConstants.HREGION_LOGDIR_NAME);
-
- manager = new ReplicationSourceManager(helper,
- conf, STOPPER, fs, REPLICATING, oldLogDir);
- manager.addSource("1");
-
- htd = new HTableDescriptor(test);
- HColumnDescriptor col = new HColumnDescriptor("f1");
- col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
- htd.addFamily(col);
- col = new HColumnDescriptor("f2");
- col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
- htd.addFamily(col);
-
- hri = new HRegionInfo(htd, r1, r2);
-
-
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- manager.join();
- utility.shutdownMiniCluster();
- }
-
- @Before
- public void setUp() throws Exception {
- fs.delete(logDir, true);
- fs.delete(oldLogDir, true);
- }
-
- @After
- public void tearDown() throws Exception {
- setUp();
- }
-
- @Test
- public void testLogRoll() throws Exception {
- long seq = 0;
- long baseline = 1000;
- long time = baseline;
- KeyValue kv = new KeyValue(r1, f1, r1);
- WALEdit edit = new WALEdit();
- edit.add(kv);
-
- HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, manager,
- URLEncoder.encode("regionserver:60020", "UTF8"));
-
- manager.init();
-
- // Testing normal log rolling every 20
- for(long i = 1; i < 101; i++) {
- if(i > 1 && i % 20 == 0) {
- hlog.rollWriter();
- }
- LOG.info(i);
- HLogKey key = new HLogKey(hri.getRegionName(),
- test, seq++, System.currentTimeMillis());
- hlog.append(hri, key, edit);
- }
-
- // Simulate a rapid insert that's followed
- // by a report that's still not totally complete (missing last one)
- LOG.info(baseline + " and " + time);
- baseline += 101;
- time = baseline;
- LOG.info(baseline + " and " + time);
-
- for (int i = 0; i < 3; i++) {
- HLogKey key = new HLogKey(hri.getRegionName(),
- test, seq++, System.currentTimeMillis());
- hlog.append(hri, key, edit);
- }
-
- assertEquals(6, manager.getHLogs().size());
-
- hlog.rollWriter();
-
- manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
- "1", 0, false);
-
- HLogKey key = new HLogKey(hri.getRegionName(),
- test, seq++, System.currentTimeMillis());
- hlog.append(hri, key, edit);
-
- assertEquals(1, manager.getHLogs().size());
-
-
- // TODO Need a case with only 2 HLogs and we only want to delete the first one
- }
-
-}