You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/09 07:22:13 UTC
[06/20] hbase git commit: HBASE-20115 Reimplement serial replication
based on the new replication storage layer
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
new file mode 100644
index 0000000..31c3ac7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Holds a batch of WAL entries to replicate, along with some statistics
+ */
+@InterfaceAudience.Private
+class WALEntryBatch {
+ private List<Entry> walEntries;
+ // last WAL that was read
+ private Path lastWalPath;
+ // position in WAL of last entry in this batch
+ private long lastWalPosition = 0;
+ // number of distinct row keys in this batch
+ private int nbRowKeys = 0;
+ // number of HFiles
+ private int nbHFiles = 0;
+ // heap size of data we need to replicate
+ private long heapSize = 0;
+ // save the last sequenceid for each region if the table has serial-replication scope
+ private Map<String, Long> lastSeqIds = new HashMap<>();
+
+ /**
+ * @param lastWalPath Path of the WAL the last entry in this batch was read from
+ */
+ WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+ this.walEntries = new ArrayList<>(maxNbEntries);
+ this.lastWalPath = lastWalPath;
+ }
+
+ public void addEntry(Entry entry) {
+ walEntries.add(entry);
+ }
+
+ /**
+ * @return the WAL Entries.
+ */
+ public List<Entry> getWalEntries() {
+ return walEntries;
+ }
+
+ /**
+ * @return the path of the last WAL that was read.
+ */
+ public Path getLastWalPath() {
+ return lastWalPath;
+ }
+
+ /**
+ * @return the position in the last WAL that was read.
+ */
+ public long getLastWalPosition() {
+ return lastWalPosition;
+ }
+
+ public void setLastWalPosition(long lastWalPosition) {
+ this.lastWalPosition = lastWalPosition;
+ }
+
+ public int getNbEntries() {
+ return walEntries.size();
+ }
+
+ /**
+ * @return the number of distinct row keys in this batch
+ */
+ public int getNbRowKeys() {
+ return nbRowKeys;
+ }
+
+ /**
+ * @return the number of HFiles in this batch
+ */
+ public int getNbHFiles() {
+ return nbHFiles;
+ }
+
+ /**
+ * @return total number of operations in this batch
+ */
+ public int getNbOperations() {
+ return getNbRowKeys() + getNbHFiles();
+ }
+
+ /**
+ * @return the heap size of this batch
+ */
+ public long getHeapSize() {
+ return heapSize;
+ }
+
+ /**
+ * @return the last sequenceid for each region if the table has serial-replication scope
+ */
+ public Map<String, Long> getLastSeqIds() {
+ return lastSeqIds;
+ }
+
+ public void incrementNbRowKeys(int increment) {
+ nbRowKeys += increment;
+ }
+
+ public void incrementNbHFiles(int increment) {
+ nbHFiles += increment;
+ }
+
+ public void incrementHeapSize(long increment) {
+ heapSize += increment;
+ }
+
+ public void setLastSeqId(String region, long sequenceId) {
+ lastSeqIds.put(region, sequenceId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 7c83c0c..bcab9b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -21,29 +21,26 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
@@ -102,16 +99,18 @@ class WALEntryStream implements Closeable {
}
/**
- * @return the next WAL entry in this stream
- * @throws IOException
- * @throws NoSuchElementException if no more entries in the stream.
+ * Returns the next WAL entry in this stream but does not advance.
+ */
+ public Entry peek() throws IOException {
+ return hasNext() ? currentEntry: null;
+ }
+
+ /**
+ * Returns the next WAL entry in this stream and advance the stream.
*/
public Entry next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- Entry save = currentEntry;
- currentEntry = null; // gets reloaded by hasNext()
+ Entry save = peek();
+ currentEntry = null;
return save;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
index a67bca1..85292f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
@@ -170,6 +170,14 @@ public class FSTableDescriptors implements TableDescriptors {
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE)
.build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
+ .setMaxVersions(HConstants.ALL_VERSIONS)
+ .setInMemory(true)
+ .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+ // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
+ .setBloomFilterType(BloomType.NONE)
+ .build())
.setCoprocessor(CoprocessorDescriptorBuilder.newBuilder(
MultiRowMutationEndpoint.class.getName())
.setPriority(Coprocessor.PRIORITY_SYSTEM)
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index c0b72aa..b106a31 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -18,13 +18,7 @@
*/
package org.apache.hadoop.hbase.util;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
-import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
-
import edu.umd.cs.findbugs.annotations.CheckForNull;
-
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
@@ -54,7 +48,6 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -71,9 +64,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HFileLink;
@@ -81,8 +72,6 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
@@ -94,6 +83,17 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
+import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
/**
* Utility methods for interacting with the underlying file system.
@@ -1028,6 +1028,10 @@ public abstract class FSUtils extends CommonFSUtils {
return regionDirs;
}
+ public static Path getRegionDir(Path tableDir, RegionInfo region) {
+ return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
+ }
+
/**
* Filter for all dirs that are legal column family names. This is generally used for colfam
* dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
index c1a77ee..ac23d1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
@@ -415,10 +415,16 @@ public class WALKeyImpl implements WALKey {
this.replicationScope = replicationScope;
}
- public void serializeReplicationScope(boolean serialize) {
- if (!serialize) {
- setReplicationScope(null);
+ public void clearReplicationScope() {
+ setReplicationScope(null);
+ }
+
+ public boolean hasSerialReplicationScope() {
+ if (replicationScope == null || replicationScope.isEmpty()) {
+ return false;
}
+ return replicationScope.values().stream()
+ .anyMatch(scope -> scope.intValue() == HConstants.REPLICATION_SCOPE_SERIAL);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index ec93207..9161e25 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -494,7 +494,7 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
+ MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@@ -535,7 +535,8 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parentA, parentB);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3);
+ MetaTableAccessor.mergeRegions(connection, merged, parentA, -1L, parentB, -1L, serverName0,
+ 3);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@@ -682,8 +683,8 @@ public class TestMetaTableAccessor {
EnvironmentEdgeManager.injectEdge(edge);
try {
// now merge the regions, effectively deleting the rows for region a and b.
- MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, regionInfoB, sn,
- 1);
+ MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, -1L, regionInfoB,
+ -1L, sn, 1);
} finally {
EnvironmentEdgeManager.reset();
}
@@ -776,7 +777,8 @@ public class TestMetaTableAccessor {
}
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
long prevCalls = scheduler.numPriorityCalls;
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
+ MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, loc.getServerName(),
+ 1);
assertTrue(prevCalls < scheduler.numPriorityCalls);
}
@@ -813,7 +815,7 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
+ MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3);
Get get1 = new Get(splitA.getRegionName());
Result resultA = meta.get(get1);
Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY,
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
index a4e8e19..e00f072 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
@@ -194,7 +194,7 @@ public class TestHRegionFileSystem {
@Test
public void testOnDiskRegionCreation() throws IOException {
- Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation");
+ Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName());
FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration conf = TEST_UTIL.getConfiguration();
@@ -226,7 +226,7 @@ public class TestHRegionFileSystem {
@Test
public void testNonIdempotentOpsWithRetries() throws IOException {
- Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation");
+ Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName());
FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration conf = TEST_UTIL.getConfiguration();
@@ -235,19 +235,15 @@ public class TestHRegionFileSystem {
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri);
assertTrue(fs.exists(regionFs.getRegionDir()));
- regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(),
- null, null);
- // HRegionFileSystem.createRegionOnFileSystem(conf, new MockFileSystemForCreate(), rootDir,
- // hri);
+ regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), rootDir, hri);
boolean result = regionFs.createDir(new Path("/foo/bar"));
assertTrue("Couldn't create the directory", result);
-
- regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
+ regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri);
result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2"));
assertTrue("Couldn't rename the directory", result);
- regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
+ regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri);
result = regionFs.deleteDir(new Path("/foo/bar"));
assertTrue("Couldn't delete the directory", result);
fs.delete(rootDir, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
index 50dffd5..fab6512 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
@@ -343,12 +343,12 @@ public class TestRegionServerMetrics {
@Test
public void testStoreCount() throws Exception {
- //Force a hfile.
+ // Force a hfile.
doNPuts(1, false);
TEST_UTIL.getAdmin().flush(tableName);
metricsRegionServer.getRegionServerWrapper().forceRecompute();
- assertGauge("storeCount", TABLES_ON_MASTER? 1: 4);
+ assertGauge("storeCount", TABLES_ON_MASTER ? 1 : 5);
assertGauge("storeFileCount", 1);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
index ffa03a2..e9e92b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -38,6 +36,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Before;
@@ -47,7 +46,7 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Category(LargeTests.class)
+@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationDroppedTables extends TestReplicationBase {
@ClassRule
@@ -56,9 +55,6 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
- /**
- * @throws java.lang.Exception
- */
@Before
public void setUp() throws Exception {
// Starting and stopping replication can make us miss new logs,
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
new file mode 100644
index 0000000..1408cf0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSerialReplication {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSerialReplication.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static String PEER_ID = "1";
+
+ private static byte[] CF = Bytes.toBytes("CF");
+
+ private static byte[] CQ = Bytes.toBytes("CQ");
+
+ private static FileSystem FS;
+
+ private static Path LOG_DIR;
+
+ private static WALProvider.Writer WRITER;
+
+ public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
+
+ private static final UUID PEER_UUID = UUID.randomUUID();
+
+ @Override
+ public UUID getPeerUUID() {
+ return PEER_UUID;
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ synchronized (WRITER) {
+ try {
+ for (Entry entry : replicateContext.getEntries()) {
+ WRITER.append(entry);
+ }
+ WRITER.sync(false);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void start() {
+ startAsync();
+ }
+
+ @Override
+ public void stop() {
+ stopAsync();
+ }
+
+ @Override
+ protected void doStart() {
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ notifyStopped();
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
+ UTIL.startMiniCluster(3);
+ LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
+ FS = UTIL.getTestFileSystem();
+ FS.mkdirs(LOG_DIR);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Rule
+ public final TestName name = new TestName();
+
+ private Path logPath;
+
+ @Before
+ public void setUp() throws IOException, StreamLacksCapabilityException {
+ UTIL.ensureSomeRegionServersAvailable(3);
+ logPath = new Path(LOG_DIR, name.getMethodName());
+ WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
+ // add in disable state, so later when enabling it all sources will start push together.
+ UTIL.getAdmin().addReplicationPeer(PEER_ID,
+ ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+ .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
+ false);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+ if (WRITER != null) {
+ WRITER.close();
+ WRITER = null;
+ }
+ }
+
+ @Test
+ public void testRegionMove() throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ UTIL.getAdmin().createTable(
+ TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
+ UTIL.waitTableAvailable(tableName);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+ HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
+ UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
+ Bytes.toBytes(rs.getServerName().getServerName()));
+ UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return !rs.getRegions(tableName).isEmpty();
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return region + " is still not on " + rs;
+ }
+ });
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 100; i < 200; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ UTIL.getAdmin().enableReplicationPeer(PEER_ID);
+ UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
+ int count = 0;
+ while (reader.next() != null) {
+ count++;
+ }
+ return count >= 200;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Not enough entries replicated";
+ }
+ });
+ try (WAL.Reader reader =
+ WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ long seqId = -1L;
+ int count = 0;
+ for (Entry entry;;) {
+ entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ assertTrue(
+ "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
+ entry.getKey().getSequenceId() >= seqId);
+ count++;
+ }
+ assertEquals(200, count);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index a53cba3..6d75fec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -321,7 +321,7 @@ public abstract class TestReplicationSourceManager {
wal.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
- "1", 0, false);
+ "1", 0, null, false);
wal.append(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
new file mode 100644
index 0000000..c8387c5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestSerialReplicationChecker {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSerialReplicationChecker.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static String PEER_ID = "1";
+
+ private static ReplicationQueueStorage QUEUE_STORAGE;
+
+ private static String WAL_FILE_NAME = "test.wal";
+
+ private SerialReplicationChecker checker;
+
+ @Rule
+ public final TestName name = new TestName();
+
+ private TableName tableName;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.startMiniCluster(1);
+ QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
+ UTIL.getConfiguration());
+ QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID,
+ WAL_FILE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ ReplicationSource source = mock(ReplicationSource.class);
+ when(source.getPeerId()).thenReturn(PEER_ID);
+ when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
+ Server server = mock(Server.class);
+ when(server.getConnection()).thenReturn(UTIL.getConnection());
+ when(source.getServer()).thenReturn(server);
+ checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
+ tableName = TableName.valueOf(name.getMethodName());
+ }
+
+ private Entry createEntry(RegionInfo region, long seqId) {
+ WALKeyImpl key = mock(WALKeyImpl.class);
+ when(key.getTableName()).thenReturn(tableName);
+ when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes());
+ when(key.getSequenceId()).thenReturn(seqId);
+ Entry entry = mock(Entry.class);
+ when(entry.getKey()).thenReturn(key);
+ return entry;
+ }
+
+ private Cell createCell(RegionInfo region) {
+ return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey())
+ .setType(Type.Put).build();
+ }
+
+ @Test
+ public void testNoBarrierCanPush() throws IOException {
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+ assertTrue(checker.canPush(createEntry(region, 100), createCell(region)));
+ }
+
+ private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
+ throws IOException {
+ Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
+ put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+ Bytes.toBytes(state.name()));
+ for (int i = 0; i < barriers.length; i++) {
+ put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
+ put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
+ }
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ table.put(put);
+ }
+ }
+
+ private void setState(RegionInfo region, RegionState.State state) throws IOException {
+ Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
+ put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+ Bytes.toBytes(state.name()));
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ table.put(put);
+ }
+ }
+
+ private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
+ QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
+ PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
+ }
+
+ @Test
+ public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+ addStateAndBarrier(region, RegionState.State.OPEN, 10);
+ Cell cell = createCell(region);
+ // can push since we are in the first range
+ assertTrue(checker.canPush(createEntry(region, 100), cell));
+ setState(region, RegionState.State.OPENING);
+ // can not push since we are in the last range and the state is OPENING
+ assertFalse(checker.canPush(createEntry(region, 102), cell));
+ addStateAndBarrier(region, RegionState.State.OPEN, 50);
+ // can not push since the previous range has not been finished yet
+ assertFalse(checker.canPush(createEntry(region, 102), cell));
+ updatePushedSeqId(region, 49);
+ // can push since the previous range has been finished
+ assertTrue(checker.canPush(createEntry(region, 102), cell));
+ setState(region, RegionState.State.OPENING);
+ assertFalse(checker.canPush(createEntry(region, 104), cell));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 2146e47..eb7d5a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -21,13 +21,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.NavigableMap;
-import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
@@ -40,13 +40,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -180,15 +180,12 @@ public class TestWALEntryStream {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
- WAL.Entry entry = entryStream.next();
+ WAL.Entry entry = entryStream.peek();
+ assertSame(entry, entryStream.next());
assertNotNull(entry);
assertFalse(entryStream.hasNext());
- try {
- entry = entryStream.next();
- fail();
- } catch (NoSuchElementException e) {
- // expected
- }
+ assertNull(entryStream.peek());
+ assertNull(entryStream.next());
oldPos = entryStream.getPosition();
}
@@ -346,10 +343,12 @@ public class TestWALEntryStream {
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ Server mockServer= Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
+ when(source.getServer()).thenReturn(mockServer);
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
walQueue, 0, getDummyFilter(), source);
Path walPath = walQueue.peek();