You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2015/12/10 08:38:35 UTC
[2/3] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication
(Ashish Singhi)
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index 457d859..db98083 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -85,17 +85,16 @@ public interface WALActionsListener {
);
/**
- *
* @param htd
* @param logKey
- * @param logEdit
- * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)}
- * It only exists to get scope when replicating. Scope should be in the WALKey and not need
- * us passing in a <code>htd</code>.
+ * @param logEdit TODO: Retire this in favor of
+ * {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} It only exists to get
+ * scope when replicating. Scope should be in the WALKey and not need us passing in a
+ * <code>htd</code>.
+ * @throws IOException If failed to parse the WALEdit
*/
- void visitLogEntryBeforeWrite(
- HTableDescriptor htd, WALKey logKey, WALEdit logEdit
- );
+ void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+ throws IOException;
/**
* For notification post append to the writer. Used by metrics system at least.
@@ -136,7 +135,9 @@ public interface WALActionsListener {
public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {}
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+ throws IOException {
+ }
@Override
public void postAppend(final long entryLen, final long elapsedTimeMillis) {}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 3501f3e..f97ec15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -18,13 +18,21 @@
package org.apache.hadoop.hbase.replication;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.NavigableMap;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
@@ -32,6 +40,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/
@InterfaceAudience.Private
public class ScopeWALEntryFilter implements WALEntryFilter {
+ private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class);
@Override
public Entry filter(Entry entry) {
@@ -41,13 +50,27 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
}
ArrayList<Cell> cells = entry.getEdit().getCells();
int size = cells.size();
+ byte[] fam;
for (int i = size - 1; i >= 0; i--) {
Cell cell = cells.get(i);
- // The scope will be null or empty if
- // there's nothing to replicate in that WALEdit
- byte[] fam = CellUtil.cloneFamily(cell);
- if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
- cells.remove(i);
+ // If a bulk load entry has a scope then that means user has enabled replication for bulk load
+ // hfiles.
+ // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so
+ // cannot refactor into one now, can revisit and see if any way to unify them.
+ if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+ Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell);
+ if (filteredBulkLoadEntryCell != null) {
+ cells.set(i, filteredBulkLoadEntryCell);
+ } else {
+ cells.remove(i);
+ }
+ } else {
+ // The scope will be null or empty if
+ // there's nothing to replicate in that WALEdit
+ fam = CellUtil.cloneFamily(cell);
+ if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
+ cells.remove(i);
+ }
}
}
if (cells.size() < size / 2) {
@@ -56,4 +79,41 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
return entry;
}
+ private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) {
+ byte[] fam;
+ BulkLoadDescriptor bld = null;
+ try {
+ bld = WALEdit.getBulkLoadDescriptor(cell);
+ } catch (IOException e) {
+ LOG.warn("Failed to get bulk load events information from the WAL file.", e);
+ return cell;
+ }
+ List<StoreDescriptor> storesList = bld.getStoresList();
+ // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
+ List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
+ Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
+ boolean anyStoreRemoved = false;
+ while (copiedStoresListIterator.hasNext()) {
+ StoreDescriptor sd = copiedStoresListIterator.next();
+ fam = sd.getFamilyName().toByteArray();
+ if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
+ copiedStoresListIterator.remove();
+ anyStoreRemoved = true;
+ }
+ }
+
+ if (!anyStoreRemoved) {
+ return cell;
+ } else if (copiedStoresList.isEmpty()) {
+ return null;
+ }
+ BulkLoadDescriptor.Builder newDesc =
+ BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
+ .setEncodedRegionName(bld.getEncodedRegionName())
+ .setBulkloadSeqNum(bld.getBulkloadSeqNum());
+ newDesc.addAllStores(copiedStoresList);
+ BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
+ return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
+ cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
index 642ee8a..f10849b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -18,14 +18,20 @@
package org.apache.hadoop.hbase.replication;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -52,19 +58,36 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
}
int size = cells.size();
+ // If null means user has explicitly not configured any table CFs so all the tables data are
+ // applicable for replication
+ if (tableCFs == null) {
+ return entry;
+ }
// return null(prevent replicating) if logKey's table isn't in this peer's
- // replicable table list (empty tableCFs means all table are replicable)
- if (tableCFs != null && !tableCFs.containsKey(tabName)) {
+ // replicable table list
+ if (!tableCFs.containsKey(tabName)) {
return null;
} else {
- List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
+ List<String> cfs = tableCFs.get(tabName);
for (int i = size - 1; i >= 0; i--) {
Cell cell = cells.get(i);
- // ignore(remove) kv if its cf isn't in the replicable cf list
- // (empty cfs means all cfs of this table are replicable)
- if ((cfs != null) && !cfs.contains(Bytes.toString(
- cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) {
- cells.remove(i);
+ // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so
+ // cannot refactor into one now, can revisit and see if any way to unify them.
+ // Filter bulk load entries separately
+ if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+ Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell);
+ if (filteredBulkLoadEntryCell != null) {
+ cells.set(i, filteredBulkLoadEntryCell);
+ } else {
+ cells.remove(i);
+ }
+ } else {
+ // ignore(remove) kv if its cf isn't in the replicable cf list
+ // (empty cfs means all cfs of this table are replicable)
+ if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(),
+ cell.getFamilyOffset(), cell.getFamilyLength()))) {
+ cells.remove(i);
+ }
}
}
}
@@ -74,4 +97,41 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
return entry;
}
+ private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) {
+ byte[] fam;
+ BulkLoadDescriptor bld = null;
+ try {
+ bld = WALEdit.getBulkLoadDescriptor(cell);
+ } catch (IOException e) {
+ LOG.warn("Failed to get bulk load events information from the WAL file.", e);
+ return cell;
+ }
+ List<StoreDescriptor> storesList = bld.getStoresList();
+ // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
+ List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
+ Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
+ boolean anyStoreRemoved = false;
+ while (copiedStoresListIterator.hasNext()) {
+ StoreDescriptor sd = copiedStoresListIterator.next();
+ fam = sd.getFamilyName().toByteArray();
+ if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
+ copiedStoresListIterator.remove();
+ anyStoreRemoved = true;
+ }
+ }
+
+ if (!anyStoreRemoved) {
+ return cell;
+ } else if (copiedStoresList.isEmpty()) {
+ return null;
+ }
+ BulkLoadDescriptor.Builder newDesc =
+ BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
+ .setEncodedRegionName(bld.getEncodedRegionName())
+ .setBulkloadSeqNum(bld.getBulkloadSeqNum());
+ newDesc.addAllStores(copiedStoresList);
+ BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
+ return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
+ cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
new file mode 100644
index 0000000..9bfea4b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before
+ * deleting it from hfile archive directory.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
+ private static final Log LOG = LogFactory.getLog(ReplicationHFileCleaner.class);
+ private ZooKeeperWatcher zkw;
+ private ReplicationQueuesClient rqc;
+ private boolean stopped = false;
+ private boolean aborted;
+
+ @Override
+ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+ // all members of this class are null if replication is disabled,
+ // so we cannot filter the files
+ if (this.getConf() == null) {
+ return files;
+ }
+
+ final Set<String> hfileRefs;
+ try {
+ // The concurrently created new hfile entries in ZK may not be included in the return list,
+ // but they won't be deleted because they're not in the checking set.
+ hfileRefs = loadHFileRefsFromPeers();
+ } catch (KeeperException e) {
+ LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files");
+ return Collections.emptyList();
+ }
+ return Iterables.filter(files, new Predicate<FileStatus>() {
+ @Override
+ public boolean apply(FileStatus file) {
+ String hfile = file.getPath().getName();
+ boolean foundHFileRefInQueue = hfileRefs.contains(hfile);
+ if (LOG.isDebugEnabled()) {
+ if (foundHFileRefInQueue) {
+ LOG.debug("Found hfile reference in ZK, keeping: " + hfile);
+ } else {
+ LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile);
+ }
+ }
+ return !foundHFileRefInQueue;
+ }
+ });
+ }
+
+ /**
+ * Load all hfile references in all replication queues from ZK. This method guarantees to return a
+ * snapshot which contains all hfile references in the zookeeper at the start of this call.
+ * However, some newly created hfile references during the call may not be included.
+ */
+ private Set<String> loadHFileRefsFromPeers() throws KeeperException {
+ Set<String> hfileRefs = Sets.newHashSet();
+ List<String> listOfPeers;
+ for (int retry = 0;; retry++) {
+ int v0 = rqc.getHFileRefsNodeChangeVersion();
+ hfileRefs.clear();
+ listOfPeers = rqc.getAllPeersFromHFileRefsQueue();
+ if (listOfPeers == null) {
+ LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions.");
+ return ImmutableSet.of();
+ }
+ for (String id : listOfPeers) {
+ List<String> peerHFileRefs = rqc.getReplicableHFiles(id);
+ if (peerHFileRefs != null) {
+ hfileRefs.addAll(peerHFileRefs);
+ }
+ }
+ int v1 = rqc.getHFileRefsNodeChangeVersion();
+ if (v0 == v1) {
+ return hfileRefs;
+ }
+ LOG.debug(String.format("Replication hfile references node cversion changed from "
+ + "%d to %d, retry = %d", v0, v1, retry));
+ }
+ }
+
+ @Override
+ public void setConf(Configuration config) {
+ // If either replication or replication of bulk load hfiles is disabled, keep all members null
+ if (!(config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
+ HConstants.REPLICATION_ENABLE_DEFAULT) && config.getBoolean(
+ HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) {
+ LOG.warn(HConstants.REPLICATION_ENABLE_KEY
+ + " is not enabled so allowing all hfile references to be deleted. Better to remove "
+ + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS
+ + " configuration.");
+ return;
+ }
+ // Make my own Configuration. Then I'll have my own connection to zk that
+ // I can close myself when time comes.
+ Configuration conf = new Configuration(config);
+ super.setConf(conf);
+ try {
+ initReplicationQueuesClient(conf);
+ } catch (IOException e) {
+ LOG.error("Error while configuring " + this.getClass().getName(), e);
+ }
+ }
+
+ private void initReplicationQueuesClient(Configuration conf)
+ throws ZooKeeperConnectionException, IOException {
+ this.zkw = new ZooKeeperWatcher(conf, "replicationHFileCleaner", null);
+ this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
+ }
+
+ @Override
+ public void stop(String why) {
+ if (this.stopped) {
+ return;
+ }
+ this.stopped = true;
+ if (this.zkw != null) {
+ LOG.info("Stopping " + this.zkw);
+ this.zkw.close();
+ }
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
+ this.aborted = true;
+ stop(why);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+
+ @Override
+ public boolean isFileDeletable(FileStatus fStat) {
+ Set<String> hfileRefsFromQueue;
+ // all members of this class are null if replication is disabled,
+ // so do not stop from deleting the file
+ if (getConf() == null) {
+ return true;
+ }
+
+ try {
+ hfileRefsFromQueue = loadHFileRefsFromPeers();
+ } catch (KeeperException e) {
+ LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable "
+ + "file for " + fStat.getPath());
+ return false;
+ }
+ return !hfileRefsFromQueue.contains(fStat.getPath().getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
new file mode 100644
index 0000000..8d5c6d4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This will load all the xml configuration files for the source cluster replication ID from
+ * user configured replication configuration directory.
+ */
+@InterfaceAudience.Private
+public class DefaultSourceFSConfigurationProvider implements SourceFSConfigurationProvider {
+ private static final Log LOG = LogFactory.getLog(DefaultSourceFSConfigurationProvider.class);
+ // Map containing all the source clusters configurations against their replication cluster id
+ private Map<String, Configuration> sourceClustersConfs = new HashMap<>();
+ private static final String XML = ".xml";
+
+ @Override
+ public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+ throws IOException {
+ if (sourceClustersConfs.get(replicationClusterId) == null) {
+ synchronized (this.sourceClustersConfs) {
+ if (sourceClustersConfs.get(replicationClusterId) == null) {
+ LOG.info("Loading source cluster FS client conf for cluster " + replicationClusterId);
+ // Load only user provided client configurations.
+ Configuration sourceClusterConf = new Configuration(false);
+
+ String replicationConfDir = sinkConf.get(HConstants.REPLICATION_CONF_DIR);
+ if (replicationConfDir == null) {
+ LOG.debug(HConstants.REPLICATION_CONF_DIR + " is not configured.");
+ URL resource = HBaseConfiguration.class.getClassLoader().getResource("hbase-site.xml");
+ if (resource != null) {
+ String path = resource.getPath();
+ replicationConfDir = path.substring(0, path.lastIndexOf("/"));
+ } else {
+ replicationConfDir = System.getenv("HBASE_CONF_DIR");
+ }
+ }
+
+ LOG.info("Loading source cluster " + replicationClusterId
+ + " file system configurations from xml files under directory " + replicationConfDir);
+ File confDir = new File(replicationConfDir, replicationClusterId);
+ String[] listofConfFiles = FileUtil.list(confDir);
+ for (String confFile : listofConfFiles) {
+ if (new File(confDir, confFile).isFile() && confFile.endsWith(XML)) {
+ // Add all the user provided client conf files
+ sourceClusterConf.addResource(new Path(confDir.getPath(), confFile));
+ }
+ }
+ this.sourceClustersConfs.put(replicationClusterId, sourceClusterConf);
+ }
+ }
+ }
+ return this.sourceClustersConfs.get(replicationClusterId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7c07ecc..d51d512 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -37,24 +37,26 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.ipc.RemoteException;
/**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
* implementation for replicating to another HBase cluster.
* For the slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
@@ -84,8 +86,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// Handles connecting to peer region servers
private ReplicationSinkManager replicationSinkMgr;
private boolean peersSelected = false;
+ private String replicationClusterId = "";
private ThreadPoolExecutor exec;
private int maxThreads;
+ private Path baseNamespaceDir;
+ private Path hfileArchiveDir;
+ private boolean replicationBulkLoadDataEnabled;
@Override
public void init(Context context) throws IOException {
@@ -108,7 +114,19 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
+ new SynchronousQueue<Runnable>());
+
+ this.replicationBulkLoadDataEnabled =
+ conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+ if (this.replicationBulkLoadDataEnabled) {
+ replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
+ }
+ // Construct base namespace directory and hfile archive directory path
+ Path rootDir = FSUtils.getRootDir(conf);
+ Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
+ baseNamespaceDir = new Path(rootDir, baseNSDir);
+ hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
}
private void decorateConf() {
@@ -317,8 +335,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
try {
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
- ReplicationProtbufUtil.replicateWALEntry(rrs,
- entries.toArray(new Entry[entries.size()]));
+ ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
replicationSinkMgr.reportSinkSuccess(sinkPeer);
return ordinal;
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
new file mode 100644
index 0000000..17f6780
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
+ * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of
+ * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster.
+ */
+@InterfaceAudience.Private
+public class HFileReplicator {
+ /** Maximum number of threads to allow in pool to copy hfiles during replication */
+ public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY =
+ "hbase.replication.bulkload.copy.maxthreads";
+ public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
+ /** Number of hfiles to copy per thread during replication */
+ public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY =
+ "hbase.replication.bulkload.copy.hfiles.perthread";
+ public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
+
+ private static final Log LOG = LogFactory.getLog(HFileReplicator.class);
+ private final String UNDERSCORE = "_";
+ private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
+
+ private Configuration sourceClusterConf;
+ private String sourceBaseNamespaceDirPath;
+ private String sourceHFileArchiveDirPath;
+ private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
+ private FileSystem sinkFs;
+ private FsDelegationToken fsDelegationToken;
+ private UserProvider userProvider;
+ private Configuration conf;
+ private Connection connection;
+ private String hbaseStagingDir;
+ private ThreadPoolExecutor exec;
+ private int maxCopyThreads;
+ private int copiesPerThread;
+
+ public HFileReplicator(Configuration sourceClusterConf,
+ String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
+ Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
+ Connection connection) throws IOException {
+ this.sourceClusterConf = sourceClusterConf;
+ this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
+ this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
+ this.bulkLoadHFileMap = tableQueueMap;
+ this.conf = conf;
+ this.connection = connection;
+
+ userProvider = UserProvider.instantiate(conf);
+ fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+ this.hbaseStagingDir = conf.get("hbase.bulkload.staging.dir");
+ this.maxCopyThreads =
+ this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
+ REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setNameFormat("HFileReplicationCallable-%1$d");
+ this.exec =
+ new ThreadPoolExecutor(1, maxCopyThreads, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), builder.build());
+ this.exec.allowCoreThreadTimeOut(true);
+ this.copiesPerThread =
+ conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
+ REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
+
+ sinkFs = FileSystem.get(conf);
+ }
+
+ public Void replicate() throws IOException {
+ // Copy all the hfiles to the local file system
+ Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir();
+
+ int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
+
+ for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
+ String tableNameString = tableStagingDir.getKey();
+ Path stagingDir = tableStagingDir.getValue();
+
+ LoadIncrementalHFiles loadHFiles = null;
+ try {
+ loadHFiles = new LoadIncrementalHFiles(conf);
+ } catch (Exception e) {
+ LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
+ + " data.", e);
+ throw new IOException(e);
+ }
+ Configuration newConf = HBaseConfiguration.create(conf);
+ newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
+ loadHFiles.setConf(newConf);
+
+ TableName tableName = TableName.valueOf(tableNameString);
+ Table table = this.connection.getTable(tableName);
+
+ // Prepare collection of queue of hfiles to be loaded(replicated)
+ Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
+ loadHFiles.prepareHFileQueue(stagingDir, table, queue, false);
+
+ if (queue.isEmpty()) {
+ LOG.warn("Replication process did not find any files to replicate in directory "
+ + stagingDir.toUri());
+ return null;
+ }
+
+ try (RegionLocator locator = connection.getRegionLocator(tableName)) {
+
+ fsDelegationToken.acquireDelegationToken(sinkFs);
+
+ // Set the staging directory which will be used by LoadIncrementalHFiles for loading the
+ // data
+ loadHFiles.setBulkToken(stagingDir.toString());
+
+ doBulkLoad(loadHFiles, table, queue, locator, maxRetries);
+ } finally {
+ cleanup(stagingDir.toString(), table);
+ }
+ }
+ return null;
+ }
+
+ private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table,
+ Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException {
+ int count = 0;
+ Pair<byte[][], byte[][]> startEndKeys;
+ while (!queue.isEmpty()) {
+ // need to reload split keys each iteration.
+ startEndKeys = locator.getStartEndKeys();
+ if (count != 0) {
+ LOG.warn("Error occured while replicating HFiles, retry attempt " + count + " with "
+ + queue.size() + " files still remaining to replicate.");
+ }
+
+ if (maxRetries != 0 && count >= maxRetries) {
+ throw new IOException("Retry attempted " + count
+ + " times without completing, bailing out.");
+ }
+ count++;
+
+ // Try bulk load
+ loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys);
+ }
+ }
+
+ private void cleanup(String stagingDir, Table table) {
+ // Release the file system delegation token
+ fsDelegationToken.releaseDelegationToken();
+ // Delete the staging directory
+ if (stagingDir != null) {
+ try {
+ sinkFs.delete(new Path(stagingDir), true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete the staging directory " + stagingDir, e);
+ }
+ }
+ // Do not close the file system
+
+ /*
+ * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn(
+ * "Failed to close the file system"); } }
+ */
+
+ // Close the table
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close the table.", e);
+ }
+ }
+ }
+
+ private Map<String, Path> copyHFilesToStagingDir() throws IOException {
+ Map<String, Path> mapOfCopiedHFiles = new HashMap<String, Path>();
+ Pair<byte[], List<String>> familyHFilePathsPair;
+ List<String> hfilePaths;
+ byte[] family;
+ Path familyStagingDir;
+ int familyHFilePathsPairsListSize;
+ int totalNoOfHFiles;
+ List<Pair<byte[], List<String>>> familyHFilePathsPairsList;
+ FileSystem sourceFs = null;
+
+ try {
+ Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath);
+ /*
+ * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster
+ * has same FS name service then it will return peer cluster FS. To avoid this we explicitly
+ * disable the loading of FS from cache, so that a new FS is created with source cluster
+ * configuration.
+ */
+ String sourceScheme = sourceClusterPath.toUri().getScheme();
+ String disableCacheName =
+ String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme });
+ sourceClusterConf.setBoolean(disableCacheName, true);
+
+ sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf);
+
+ User user = userProvider.getCurrent();
+ // For each table name in the map
+ for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap
+ .entrySet()) {
+ String tableName = tableEntry.getKey();
+
+ // Create staging directory for each table
+ Path stagingDir =
+ createStagingDir(new Path(hbaseStagingDir), user, TableName.valueOf(tableName));
+
+ familyHFilePathsPairsList = tableEntry.getValue();
+ familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
+
+ // For each list of family hfile paths pair in the table
+ for (int i = 0; i < familyHFilePathsPairsListSize; i++) {
+ familyHFilePathsPair = familyHFilePathsPairsList.get(i);
+
+ family = familyHFilePathsPair.getFirst();
+ hfilePaths = familyHFilePathsPair.getSecond();
+
+ familyStagingDir = new Path(stagingDir, Bytes.toString(family));
+ totalNoOfHFiles = hfilePaths.size();
+
+ // For each list of hfile paths for the family
+ List<Future<Void>> futures = new ArrayList<Future<Void>>();
+ Callable<Void> c;
+ Future<Void> future;
+ int currentCopied = 0;
+ // Copy the hfiles parallely
+ while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
+ c =
+ new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
+ currentCopied + this.copiesPerThread));
+ future = exec.submit(c);
+ futures.add(future);
+ currentCopied += this.copiesPerThread;
+ }
+
+ int remaining = totalNoOfHFiles - currentCopied;
+ if (remaining > 0) {
+ c =
+ new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
+ currentCopied + remaining));
+ future = exec.submit(c);
+ futures.add(future);
+ }
+
+ for (Future<Void> f : futures) {
+ try {
+ f.get();
+ } catch (InterruptedException e) {
+ InterruptedIOException iioe =
+ new InterruptedIOException(
+ "Failed to copy HFiles to local file system. This will be retried again "
+ + "by the source cluster.");
+ iioe.initCause(e);
+ throw iioe;
+ } catch (ExecutionException e) {
+ throw new IOException("Failed to copy HFiles to local file system. This will "
+ + "be retried again by the source cluster.", e);
+ }
+ }
+ }
+ // Add the staging directory to this table. Staging directory contains all the hfiles
+ // belonging to this table
+ mapOfCopiedHFiles.put(tableName, stagingDir);
+ }
+ return mapOfCopiedHFiles;
+ } finally {
+ if (sourceFs != null) {
+ sourceFs.close();
+ }
+ if(exec != null) {
+ exec.shutdown();
+ }
+ }
+ }
+
+ private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
+ String tblName = tableName.getNameAsString().replace(":", UNDERSCORE);
+ int RANDOM_WIDTH = 320;
+ int RANDOM_RADIX = 32;
+ String doubleUnderScore = UNDERSCORE + UNDERSCORE;
+ String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore
+ + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX));
+ return createStagingDir(baseDir, user, randomDir);
+ }
+
+ private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
+ Path p = new Path(baseDir, randomDir);
+ sinkFs.mkdirs(p, PERM_ALL_ACCESS);
+ sinkFs.setPermission(p, PERM_ALL_ACCESS);
+ return p;
+ }
+
+ /**
+ * This class will copy the given hfiles from the given source file system to the given local file
+ * system staging directory.
+ */
+ private class Copier implements Callable<Void> {
+ private FileSystem sourceFs;
+ private Path stagingDir;
+ private List<String> hfiles;
+
+ public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles)
+ throws IOException {
+ this.sourceFs = sourceFs;
+ this.stagingDir = stagingDir;
+ this.hfiles = hfiles;
+ }
+
+ @Override
+ public Void call() throws IOException {
+ Path sourceHFilePath;
+ Path localHFilePath;
+ int totalHFiles = hfiles.size();
+ for (int i = 0; i < totalHFiles; i++) {
+ sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i));
+ localHFilePath = new Path(stagingDir, sourceHFilePath.getName());
+ try {
+ FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
+ // If any other exception other than FNFE then we will fail the replication requests and
+ // source will retry to replicate these data.
+ } catch (FileNotFoundException e) {
+ LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+ + ". Trying to copy from hfile archive directory.",
+ e);
+ sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i));
+
+ try {
+ FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
+ } catch (FileNotFoundException e1) {
+ // This will mean that the hfile does not exists any where in source cluster FS. So we
+ // cannot do anything here just log and return.
+ LOG.error("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+ + ". Hence ignoring this hfile from replication..",
+ e1);
+ return null;
+ }
+ }
+ sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
+ }
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index 37dc1dd..f308daf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -47,7 +47,7 @@ public class MetricsSink {
if (lastTimestampForAge != timestamp) {
lastTimestampForAge = timestamp;
age = System.currentTimeMillis() - lastTimestampForAge;
- }
+ }
mss.setLastAppliedOpAge(age);
return age;
}
@@ -72,6 +72,17 @@ public class MetricsSink {
}
/**
+ * Convience method to change metrics when a batch of operations are applied.
+ *
+ * @param batchSize total number of mutations that are applied/replicated
+ * @param hfileSize total number of hfiles that are applied/replicated
+ */
+ public void applyBatch(long batchSize, long hfileSize) {
+ applyBatch(batchSize);
+ mss.incrAppliedHFiles(hfileSize);
+ }
+
+ /**
* Get the Age of Last Applied Op
* @return ageOfLastAppliedOp
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index f9f7001..9687af7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -40,11 +40,13 @@ public class MetricsSource {
// tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
private int lastQueueSize = 0;
+ private long lastHFileRefsQueueSize = 0;
private String id;
private final MetricsReplicationSourceSource singleSourceSource;
private final MetricsReplicationSourceSource globalSourceSource;
+
/**
* Constructor used to register the metrics
*
@@ -143,6 +145,18 @@ public class MetricsSource {
globalSourceSource.incrShippedKBs(sizeInKB);
}
+ /**
+ * Convience method to apply changes to metrics do to shipping a batch of logs.
+ *
+ * @param batchSize the size of the batch that was shipped to sinks.
+ * @param hfiles total number of hfiles shipped to sinks.
+ */
+ public void shipBatch(long batchSize, int sizeInKB, long hfiles) {
+ shipBatch(batchSize, sizeInKB);
+ singleSourceSource.incrHFilesShipped(hfiles);
+ globalSourceSource.incrHFilesShipped(hfiles);
+ }
+
/** increase the byte number read by source from log file */
public void incrLogReadInBytes(long readInBytes) {
singleSourceSource.incrLogReadInBytes(readInBytes);
@@ -153,8 +167,10 @@ public class MetricsSource {
public void clear() {
singleSourceSource.clear();
globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
+ globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
lastTimeStamps.clear();
lastQueueSize = 0;
+ lastHFileRefsQueueSize = 0;
}
/**
@@ -194,4 +210,19 @@ public class MetricsSource {
public String getPeerID() {
return id;
}
+
+ public void incrSizeOfHFileRefsQueue(long size) {
+ singleSourceSource.incrSizeOfHFileRefsQueue(size);
+ globalSourceSource.incrSizeOfHFileRefsQueue(size);
+ lastHFileRefsQueueSize = size;
+ }
+
+ public void decrSizeOfHFileRefsQueue(int size) {
+ singleSourceSource.decrSizeOfHFileRefsQueue(size);
+ globalSourceSource.decrSizeOfHFileRefsQueue(size);
+ lastHFileRefsQueueSize -= size;
+ if (lastHFileRefsQueueSize < 0) {
+ lastHFileRefsQueueSize = 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index b3db0f6..30153f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -649,8 +649,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
// set the region name for the target region replica
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
- ReplicationProtbufUtil.buildReplicateWALEntryRequest(
- entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
+ ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
+ .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
try {
PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
controller.setCallTimeout(timeout);
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index b396dfc..d2a0776 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,7 +45,10 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -55,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -71,6 +76,7 @@ public class Replication extends WALActionsListener.Base implements
private static final Log LOG =
LogFactory.getLog(Replication.class);
private boolean replication;
+ private boolean replicationForBulkLoadData;
private ReplicationSourceManager replicationManager;
private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
@@ -84,7 +90,6 @@ public class Replication extends WALActionsListener.Base implements
private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
-
/**
* Instantiate the replication management (if rep is enabled).
* @param server Hosting server
@@ -109,11 +114,20 @@ public class Replication extends WALActionsListener.Base implements
this.server = server;
this.conf = this.server.getConfiguration();
this.replication = isReplication(this.conf);
+ this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
.setDaemon(true)
.build());
+ if (this.replicationForBulkLoadData) {
+ if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
+ || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
+ throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
+ + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
+ + " is set to true.");
+ }
+ }
if (replication) {
try {
this.replicationQueues =
@@ -158,6 +172,15 @@ public class Replication extends WALActionsListener.Base implements
return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
}
+ /**
+ * @param c Configuration to look at
+ * @return True if replication for bulk load data is enabled.
+ */
+ public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
+ return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+ }
+
/*
* Returns an object to listen to new wal changes
**/
@@ -187,14 +210,22 @@ public class Replication extends WALActionsListener.Base implements
/**
* Carry on the list of log entries down to the sink
* @param entries list of entries to replicate
- * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
- * do not contain the Cells we are replicating; they are passed here on the side in this
- * CellScanner).
+ * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
+ * contain the Cells we are replicating; they are passed here on the side in this
+ * CellScanner).
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+ * directory required for replicating hfiles
+ * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
* @throws IOException
*/
- public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
+ public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
+ String replicationClusterId, String sourceBaseNamespaceDirPath,
+ String sourceHFileArchiveDirPath) throws IOException {
if (this.replication) {
- this.replicationSink.replicateEntries(entries, cells);
+ this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
+ sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
}
}
@@ -226,34 +257,44 @@ public class Replication extends WALActionsListener.Base implements
}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
- WALEdit logEdit) {
- scopeWALEdits(htd, logKey, logEdit);
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+ throws IOException {
+ scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
}
/**
- * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys
- * from compaction WAL edits and if the scope is local.
+ * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
+ * compaction WAL edits and if the scope is local.
* @param htd Descriptor used to find the scope to use
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
+ * @param replicationManager Manager used to add bulk load events hfile references
+ * @throws IOException If failed to parse the WALEdit
*/
- public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey,
- WALEdit logEdit) {
- NavigableMap<byte[], Integer> scopes =
- new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
+ Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
byte[] family;
+ boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
for (Cell cell : logEdit.getCells()) {
- family = CellUtil.cloneFamily(cell);
- // This is expected and the KV should not be replicated
- if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
- // Unexpected, has a tendency to happen in unit tests
- assert htd.getFamily(family) != null;
+ if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+ if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
+ } else {
+ // Skip the flush/compaction/region events
+ continue;
+ }
+ } else {
+ family = CellUtil.cloneFamily(cell);
+ // Unexpected, has a tendency to happen in unit tests
+ assert htd.getFamily(family) != null;
- int scope = htd.getFamily(family).getScope();
- if (scope != REPLICATION_SCOPE_LOCAL &&
- !scopes.containsKey(family)) {
- scopes.put(family, scope);
+ if (!scopes.containsKey(family)) {
+ int scope = htd.getFamily(family).getScope();
+ if (scope != REPLICATION_SCOPE_LOCAL) {
+ scopes.put(family, scope);
+ }
+ }
}
}
if (!scopes.isEmpty()) {
@@ -261,6 +302,40 @@ public class Replication extends WALActionsListener.Base implements
}
}
+ private static void scopeBulkLoadEdits(HTableDescriptor htd,
+ ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
+ TableName tableName, Cell cell) throws IOException {
+ byte[] family;
+ try {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ for (StoreDescriptor s : bld.getStoresList()) {
+ family = s.getFamilyName().toByteArray();
+ if (!scopes.containsKey(family)) {
+ int scope = htd.getFamily(family).getScope();
+ if (scope != REPLICATION_SCOPE_LOCAL) {
+ scopes.put(family, scope);
+ addHFileRefsToQueue(replicationManager, tableName, family, s);
+ }
+ } else {
+ addHFileRefsToQueue(replicationManager, tableName, family, s);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to get bulk load events information from the wal file.", e);
+ throw e;
+ }
+ }
+
+ private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager,
+ TableName tableName, byte[] family, StoreDescriptor s) throws IOException {
+ try {
+ replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
+ } catch (ReplicationException e) {
+ LOG.error("Failed to create hfile references in ZK.", e);
+ throw new IOException(e);
+ }
+ }
+
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
getReplicationManager().preLogRoll(newPath);
@@ -272,8 +347,7 @@ public class Replication extends WALActionsListener.Base implements
}
/**
- * This method modifies the master's configuration in order to inject
- * replication-related features
+ * This method modifies the master's configuration in order to inject replication-related features
* @param conf
*/
public static void decorateMasterConfiguration(Configuration conf) {
@@ -285,6 +359,13 @@ public class Replication extends WALActionsListener.Base implements
if (!plugins.contains(cleanerClass)) {
conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
}
+ if (isReplicationForBulkLoadDataEnabled(conf)) {
+ plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
+ cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
+ if (!plugins.contains(cleanerClass)) {
+ conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
+ }
+ }
}
/*
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index f10f5e3..9e7b3af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -33,15 +33,16 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
@@ -51,6 +52,11 @@ import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
/**
* <p>
@@ -78,6 +84,9 @@ public class ReplicationSink {
private final MetricsSink metrics;
private final AtomicLong totalReplicatedEdits = new AtomicLong();
private final Object sharedHtableConLock = new Object();
+ // Number of hfiles that we successfully replicated
+ private long hfilesReplicated = 0;
+ private SourceFSConfigurationProvider provider;
/**
* Create a sink for replication
@@ -91,6 +100,18 @@ public class ReplicationSink {
this.conf = HBaseConfiguration.create(conf);
decorateConf();
this.metrics = new MetricsSink();
+
+ String className =
+ conf.get("hbase.replication.source.fs.conf.provider",
+ DefaultSourceFSConfigurationProvider.class.getCanonicalName());
+ try {
+ @SuppressWarnings("rawtypes")
+ Class c = Class.forName(className);
+ this.provider = (SourceFSConfigurationProvider) c.newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Configured source fs configuration provider class "
+ + className + " throws error.", e);
+ }
}
/**
@@ -113,9 +134,16 @@ public class ReplicationSink {
* operates against raw protobuf type saving on a conversion from pb to pojo.
* @param entries
* @param cells
- * @throws IOException
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+ * directory
+ * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
+ * @throws IOException If failed to replicate the data
*/
- public void replicateEntries(List<WALEntry> entries, final CellScanner cells) throws IOException {
+ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
+ String replicationClusterId, String sourceBaseNamespaceDirPath,
+ String sourceHFileArchiveDirPath) throws IOException {
if (entries.isEmpty()) return;
if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
// Very simple optimization where we batch sequences of rows going
@@ -126,6 +154,10 @@ public class ReplicationSink {
// invocation of this method per table and cluster id.
Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
+
+ // Map of table name Vs list of pair of family and list of hfile paths from its namespace
+ Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
+
for (WALEntry entry : entries) {
TableName table =
TableName.valueOf(entry.getKey().getTableName().toByteArray());
@@ -138,33 +170,60 @@ public class ReplicationSink {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
- if (isNewRowOrType(previousCell, cell)) {
- // Create new mutation
- m = CellUtil.isDelete(cell)?
- new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
- new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- List<UUID> clusterIds = new ArrayList<UUID>();
- for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){
- clusterIds.add(toUUID(clusterId));
+ // Handle bulk load hfiles replication
+ if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ if (bulkLoadHFileMap == null) {
+ bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>();
}
- m.setClusterIds(clusterIds);
- addToHashMultiMap(rowMap, table, clusterIds, m);
- }
- if (CellUtil.isDelete(cell)) {
- ((Delete)m).addDeleteMarker(cell);
+ buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
} else {
- ((Put)m).add(cell);
+ // Handle wal replication
+ if (isNewRowOrType(previousCell, cell)) {
+ // Create new mutation
+ m =
+ CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
+ cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
+ cell.getRowLength());
+ List<UUID> clusterIds = new ArrayList<UUID>();
+ for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
+ clusterIds.add(toUUID(clusterId));
+ }
+ m.setClusterIds(clusterIds);
+ addToHashMultiMap(rowMap, table, clusterIds, m);
+ }
+ if (CellUtil.isDelete(cell)) {
+ ((Delete) m).addDeleteMarker(cell);
+ } else {
+ ((Put) m).add(cell);
+ }
+ previousCell = cell;
}
- previousCell = cell;
}
totalReplicated++;
}
- for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) {
- batch(entry.getKey(), entry.getValue().values());
+
+ // TODO Replicating mutations and bulk loaded data can be made parallel
+ if (!rowMap.isEmpty()) {
+ LOG.debug("Started replicating mutations.");
+ for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
+ batch(entry.getKey(), entry.getValue().values());
+ }
+ LOG.debug("Finished replicating mutations.");
+ }
+
+ if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
+ LOG.debug("Started replicating bulk loaded data.");
+ HFileReplicator hFileReplicator =
+ new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
+ sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
+ getConnection());
+ hFileReplicator.replicate();
+ LOG.debug("Finished replicating bulk loaded data.");
}
+
int size = entries.size();
this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
- this.metrics.applyBatch(size);
+ this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex);
@@ -172,6 +231,76 @@ public class ReplicationSink {
}
}
+ private void buildBulkLoadHFileMap(
+ final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
+ Cell cell) throws IOException {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ List<StoreDescriptor> storesList = bld.getStoresList();
+ int storesSize = storesList.size();
+ for (int j = 0; j < storesSize; j++) {
+ StoreDescriptor storeDescriptor = storesList.get(j);
+ List<String> storeFileList = storeDescriptor.getStoreFileList();
+ int storeFilesSize = storeFileList.size();
+ hfilesReplicated += storeFilesSize;
+ for (int k = 0; k < storeFilesSize; k++) {
+ byte[] family = storeDescriptor.getFamilyName().toByteArray();
+
+ // Build hfile relative path from its namespace
+ String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
+
+ String tableName = table.getNameWithNamespaceInclAsString();
+ if (bulkLoadHFileMap.containsKey(tableName)) {
+ List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
+ boolean foundFamily = false;
+ for (int i = 0; i < familyHFilePathsList.size(); i++) {
+ Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsList.get(i);
+ if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
+ // Found family already present, just add the path to the existing list
+ familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
+ foundFamily = true;
+ break;
+ }
+ }
+ if (!foundFamily) {
+ // Family not found, add this family and its hfile paths pair to the list
+ addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
+ }
+ } else {
+ // Add this table entry into the map
+ addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
+ }
+ }
+ }
+ }
+
+ private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
+ List<Pair<byte[], List<String>>> familyHFilePathsList) {
+ List<String> hfilePaths = new ArrayList<String>();
+ hfilePaths.add(pathToHfileFromNS);
+ familyHFilePathsList.add(new Pair<byte[], List<String>>(family, hfilePaths));
+ }
+
+ private void addNewTableEntryInMap(
+ final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
+ String pathToHfileFromNS, String tableName) {
+ List<String> hfilePaths = new ArrayList<String>();
+ hfilePaths.add(pathToHfileFromNS);
+ Pair<byte[], List<String>> newFamilyHFilePathsPair =
+ new Pair<byte[], List<String>>(family, hfilePaths);
+ List<Pair<byte[], List<String>>> newFamilyHFilePathsList =
+ new ArrayList<Pair<byte[], List<String>>>();
+ newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
+ bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
+ }
+
+ private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
+ byte[] family) {
+ return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
+ .append(table.getQualifierAsString()).append(Path.SEPARATOR)
+ .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
+ .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
+ }
+
/**
* @param previousCell
* @param cell
@@ -241,22 +370,13 @@ public class ReplicationSink {
}
Table table = null;
try {
- // See https://en.wikipedia.org/wiki/Double-checked_locking
- Connection connection = this.sharedHtableCon;
- if (connection == null) {
- synchronized (sharedHtableConLock) {
- connection = this.sharedHtableCon;
- if (connection == null) {
- connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
- }
- }
- }
+ Connection connection = getConnection();
table = connection.getTable(tableName);
for (List<Row> rows : allRows) {
table.batch(rows, null);
}
} catch (InterruptedException ix) {
- throw (InterruptedIOException)new InterruptedIOException().initCause(ix);
+ throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
} finally {
if (table != null) {
table.close();
@@ -264,6 +384,20 @@ public class ReplicationSink {
}
}
+ private Connection getConnection() throws IOException {
+ // See https://en.wikipedia.org/wiki/Double-checked_locking
+ Connection connection = sharedHtableCon;
+ if (connection == null) {
+ synchronized (sharedHtableConLock) {
+ connection = sharedHtableCon;
+ if (connection == null) {
+ connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
+ }
+ }
+ }
+ return connection;
+ }
+
/**
* Get a string representation of this sink's metrics
* @return string with the total replicated edits count and the date
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3d99523..868ddee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -47,9 +46,10 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -59,8 +59,12 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
@@ -223,6 +227,34 @@ public class ReplicationSource extends Thread
}
}
+ @Override
+ public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ throws ReplicationException {
+ String peerId = peerClusterZnode;
+ if (peerId.contains("-")) {
+ // peerClusterZnode will be in the form peerId + "-" + rsZNode.
+ // A peerId will not have "-" in its name, see HBASE-11394
+ peerId = peerClusterZnode.split("-")[0];
+ }
+ Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
+ if (tableCFMap != null) {
+ List<String> tableCfs = tableCFMap.get(tableName);
+ if (tableCFMap.containsKey(tableName)
+ && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
+ this.replicationQueues.addHFileRefs(peerId, files);
+ metrics.incrSizeOfHFileRefsQueue(files.size());
+ } else {
+ LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
+ + Bytes.toString(family) + " to peer id " + peerId);
+ }
+ } else {
+ // user has explicitly not defined any table cfs for replication, means replicate all the
+ // data
+ this.replicationQueues.addHFileRefs(peerId, files);
+ metrics.incrSizeOfHFileRefsQueue(files.size());
+ }
+ }
+
private void uninitialize() {
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
@@ -471,6 +503,8 @@ public class ReplicationSource extends Thread
private int currentSize = 0;
// Indicates whether this particular worker is running
private boolean workerRunning = true;
+ // Current number of hfiles that we need to replicate
+ private long currentNbHFiles = 0;
public ReplicationSourceWorkerThread(String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
@@ -550,6 +584,7 @@ public class ReplicationSource extends Thread
boolean gotIOE = false;
currentNbOperations = 0;
+ currentNbHFiles = 0;
List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
currentSize = 0;
try {
@@ -701,6 +736,28 @@ public class ReplicationSource extends Thread
return seenEntries == 0 && processEndOfFile();
}
+ private void cleanUpHFileRefs(WALEdit edit) throws IOException {
+ String peerId = peerClusterZnode;
+ if (peerId.contains("-")) {
+ // peerClusterZnode will be in the form peerId + "-" + rsZNode.
+ // A peerId will not have "-" in its name, see HBASE-11394
+ peerId = peerClusterZnode.split("-")[0];
+ }
+ List<Cell> cells = edit.getCells();
+ for (int i = 0; i < cells.size(); i++) {
+ Cell cell = cells.get(i);
+ if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ List<StoreDescriptor> stores = bld.getStoresList();
+ for (int j = 0; j < stores.size(); j++) {
+ List<String> storeFileList = stores.get(j).getStoreFileList();
+ manager.cleanUpHFileRefs(peerId, storeFileList);
+ metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
+ }
+ }
+ }
+ }
+
/**
* Poll for the next path
* @return true if a path was obtained, false if not
@@ -853,14 +910,31 @@ public class ReplicationSource extends Thread
private int countDistinctRowKeys(WALEdit edit) {
List<Cell> cells = edit.getCells();
int distinctRowKeys = 1;
+ int totalHFileEntries = 0;
Cell lastCell = cells.get(0);
+
for (int i = 0; i < edit.size(); i++) {
+ // Count HFiles to be replicated
+ if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+ try {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+ List<StoreDescriptor> stores = bld.getStoresList();
+ for (int j = 0; j < stores.size(); j++) {
+ totalHFileEntries += stores.get(j).getStoreFileList().size();
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ + "This its hfiles count will not be added into metric.");
+ }
+ }
+
if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
distinctRowKeys++;
}
lastCell = cells.get(i);
}
- return distinctRowKeys;
+ currentNbHFiles += totalHFileEntries;
+ return distinctRowKeys + totalHFileEntries;
}
/**
@@ -914,6 +988,12 @@ public class ReplicationSource extends Thread
}
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
+ //Clean up hfile references
+ int size = entries.size();
+ for (int i = 0; i < size; i++) {
+ cleanUpHFileRefs(entries.get(i).getEdit());
+ }
+ //Log and clean up WAL logs
manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
currentWALisBeingWrittenTo);
@@ -925,7 +1005,7 @@ public class ReplicationSource extends Thread
totalReplicatedEdits.addAndGet(entries.size());
totalReplicatedOperations.addAndGet(currentNbOperations);
// FIXME check relationship between wal group and overall
- metrics.shipBatch(currentNbOperations, currentSize / 1024);
+ metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles);
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId);
if (LOG.isTraceEnabled()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 1e9c714..7f4a9f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
+import java.util.List;
import java.util.UUID;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -26,7 +27,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -105,4 +108,14 @@ public interface ReplicationSourceInterface {
*/
String getStats();
+ /**
+ * Add hfile names to the queue to be replicated.
+ * @param tableName Name of the table these files belongs to
+ * @param family Name of the family these files belong to
+ * @param files files whose names needs to be added to the queue to be replicated
+ * @throws ReplicationException If failed to add hfile references
+ */
+ void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ throws ReplicationException;
+
}