You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/07 06:45:45 UTC
[09/29] hbase git commit: HBASE-19083 Introduce a new log writer
which can write to two HDFSes
HBASE-19083 Introduce a new log writer which can write to two HDFSes
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ee19608b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ee19608b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ee19608b
Branch: refs/heads/HBASE-19064
Commit: ee19608b2a25526e0969b9f0dd99f3b9a3ebb797
Parents: 971f535
Author: zhangduo <zh...@apache.org>
Authored: Thu Jan 11 21:08:02 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon May 7 10:43:47 2018 +0800
----------------------------------------------------------------------
.../hbase/regionserver/wal/AsyncFSWAL.java | 21 +--
.../regionserver/wal/CombinedAsyncWriter.java | 134 ++++++++++++++++++
.../hbase/regionserver/wal/DualAsyncFSWAL.java | 67 +++++++++
.../wal/AbstractTestProtobufLog.java | 110 +++------------
.../regionserver/wal/ProtobufLogTestHelper.java | 99 ++++++++++++++
.../regionserver/wal/TestAsyncProtobufLog.java | 32 +----
.../wal/TestCombinedAsyncWriter.java | 136 +++++++++++++++++++
.../hbase/regionserver/wal/TestProtobufLog.java | 14 +-
.../regionserver/wal/WriterOverAsyncWriter.java | 63 +++++++++
9 files changed, 533 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee19608b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index e34818f..0bee9d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -607,12 +607,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
- @Override
- protected AsyncWriter createWriterInstance(Path path) throws IOException {
+ protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup,
channelClass);
}
+ @Override
+ protected AsyncWriter createWriterInstance(Path path) throws IOException {
+ return createAsyncWriter(fs, path);
+ }
+
private void waitForSafePoint() {
consumeLock.lock();
try {
@@ -632,13 +636,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
- private long closeWriter() {
- AsyncWriter oldWriter = this.writer;
- if (oldWriter != null) {
- long fileLength = oldWriter.getLength();
+ protected final long closeWriter(AsyncWriter writer) {
+ if (writer != null) {
+ long fileLength = writer.getLength();
closeExecutor.execute(() -> {
try {
- oldWriter.close();
+ writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
}
@@ -654,7 +657,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
throws IOException {
Preconditions.checkNotNull(nextWriter);
waitForSafePoint();
- long oldFileLen = closeWriter();
+ long oldFileLen = closeWriter(this.writer);
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter instanceof AsyncProtobufLogWriter) {
@@ -679,7 +682,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
- closeWriter();
+ closeWriter(this.writer);
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee19608b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
new file mode 100644
index 0000000..8ecfede
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
+/**
+ * An {@link AsyncWriter} wrapper which writes data to a set of {@link AsyncWriter} instances.
+ */
+@InterfaceAudience.Private
+public abstract class CombinedAsyncWriter implements AsyncWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CombinedAsyncWriter.class);
+
+ protected final ImmutableList<AsyncWriter> writers;
+
+ protected CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) {
+ this.writers = writers;
+ }
+
+ @Override
+ public long getLength() {
+ return writers.get(0).getLength();
+ }
+
+ @Override
+ public void close() throws IOException {
+ Exception error = null;
+ for (AsyncWriter writer : writers) {
+ try {
+ writer.close();
+ } catch (Exception e) {
+ LOG.warn("close writer failed", e);
+ if (error == null) {
+ error = e;
+ }
+ }
+ }
+ if (error != null) {
+ throw new IOException("Failed to close at least one writer, please see the warn log above. " +
+ "The cause is the first exception occured", error);
+ }
+ }
+
+ protected abstract void doSync(CompletableFuture<Long> future);
+
+ @Override
+ public CompletableFuture<Long> sync() {
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ doSync(future);
+ return future;
+ }
+
+ @Override
+ public void append(Entry entry) {
+ writers.forEach(w -> w.append(entry));
+ }
+
+ public enum Mode {
+ SEQUENTIAL, PARALLEL
+ }
+
+ public static CombinedAsyncWriter create(Mode mode, AsyncWriter writer, AsyncWriter... writers) {
+ ImmutableList<AsyncWriter> ws =
+ ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build();
+ switch (mode) {
+ case SEQUENTIAL:
+ return new CombinedAsyncWriter(ws) {
+
+ private void doSync(CompletableFuture<Long> future, Long length, int index) {
+ if (index == writers.size()) {
+ future.complete(length);
+ return;
+ }
+ writers.get(index).sync().whenComplete((len, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ doSync(future, len, index + 1);
+ });
+ }
+
+ @Override
+ protected void doSync(CompletableFuture<Long> future) {
+ doSync(future, null, 0);
+ }
+ };
+ case PARALLEL:
+ return new CombinedAsyncWriter(ws) {
+
+ @Override
+ protected void doSync(CompletableFuture<Long> future) {
+ AtomicInteger remaining = new AtomicInteger(writers.size());
+ writers.forEach(w -> w.sync().whenComplete((length, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (remaining.decrementAndGet() == 0) {
+ future.complete(length);
+ }
+ }));
+ }
+ };
+ default:
+ throw new IllegalArgumentException("Unknown mode: " + mode);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee19608b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
new file mode 100644
index 0000000..42b0dae
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
+/**
+ * An AsyncFSWAL which writes data to two filesystems.
+ */
+@InterfaceAudience.Private
+public class DualAsyncFSWAL extends AsyncFSWAL {
+
+ private final FileSystem remoteFs;
+
+ private final Path remoteWalDir;
+
+ public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteRootDir,
+ String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
+ boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
+ Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
+ super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
+ eventLoopGroup, channelClass);
+ this.remoteFs = remoteFs;
+ this.remoteWalDir = new Path(remoteRootDir, logDir);
+ }
+
+ @Override
+ protected AsyncWriter createWriterInstance(Path path) throws IOException {
+ AsyncWriter localWriter = super.createWriterInstance(path);
+ AsyncWriter remoteWriter;
+ boolean succ = false;
+ try {
+ remoteWriter = createAsyncWriter(remoteFs, new Path(remoteWalDir, path.getName()));
+ succ = true;
+ } finally {
+ if (!succ) {
+ closeWriter(localWriter);
+ }
+ }
+ return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter,
+ localWriter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee19608b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
index c3f3277..5098609 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
@@ -18,33 +18,15 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -56,8 +38,8 @@ import org.junit.rules.TestName;
/**
* WAL tests that can be reused across providers.
*/
-public abstract class AbstractTestProtobufLog<W extends Closeable> {
- protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+public abstract class AbstractTestProtobufLog {
+ protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected FileSystem fs;
protected Path dir;
@@ -93,14 +75,7 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
// faster failover with cluster.shutdown();fs.close() idiom
- TEST_UTIL.getConfiguration()
- .setInt("hbase.ipc.client.connect.max.retries", 1);
- TEST_UTIL.getConfiguration().setInt(
- "dfs.client.block.recovery.retries", 1);
- TEST_UTIL.getConfiguration().setInt(
- "hbase.ipc.client.connection.maxidletime", 500);
- TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
- SampleRegionWALCoprocessor.class.getName());
+ TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
TEST_UTIL.startMiniDFSCluster(3);
}
@@ -131,77 +106,24 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
* @throws IOException
*/
private void doRead(boolean withTrailer) throws IOException {
- final int columnCount = 5;
- final int recordCount = 5;
- final TableName tableName =
- TableName.valueOf("tablename");
- final byte[] row = Bytes.toBytes("row");
+ int columnCount = 5;
+ int recordCount = 5;
+ TableName tableName = TableName.valueOf("tablename");
+ byte[] row = Bytes.toBytes("row");
long timestamp = System.currentTimeMillis();
Path path = new Path(dir, "tempwal");
// delete the log if already exists, for test only
fs.delete(path, true);
- W writer = null;
- ProtobufLogReader reader = null;
- try {
- HRegionInfo hri = new HRegionInfo(tableName,
- HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
- HTableDescriptor htd = new HTableDescriptor(tableName);
- fs.mkdirs(dir);
- // Write log in pb format.
- writer = createWriter(path);
- for (int i = 0; i < recordCount; ++i) {
- WALKeyImpl key = new WALKeyImpl(
- hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
- WALEdit edit = new WALEdit();
- for (int j = 0; j < columnCount; ++j) {
- if (i == 0) {
- htd.addFamily(new HColumnDescriptor("column" + j));
- }
- String value = i + "" + j;
- edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
- }
- append(writer, new WAL.Entry(key, edit));
- }
- sync(writer);
- if (withTrailer) writer.close();
-
- // Now read the log using standard means.
- reader = (ProtobufLogReader) wals.createReader(fs, path);
- if (withTrailer) {
- assertNotNull(reader.trailer);
- } else {
- assertNull(reader.trailer);
- }
- for (int i = 0; i < recordCount; ++i) {
- WAL.Entry entry = reader.next();
- assertNotNull(entry);
- assertEquals(columnCount, entry.getEdit().size());
- assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
- assertEquals(tableName, entry.getKey().getTableName());
- int idx = 0;
- for (Cell val : entry.getEdit().getCells()) {
- assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
- val.getRowLength()));
- String value = i + "" + idx;
- assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val));
- idx++;
- }
- }
- WAL.Entry entry = reader.next();
- assertNull(entry);
- } finally {
- if (writer != null) {
- writer.close();
- }
- if (reader != null) {
- reader.close();
+ fs.mkdirs(dir);
+ try (WALProvider.Writer writer = createWriter(path)) {
+ ProtobufLogTestHelper.doWrite(writer, withTrailer, tableName, columnCount, recordCount, row,
+ timestamp);
+ try (ProtobufLogReader reader = (ProtobufLogReader) wals.createReader(fs, path)) {
+ ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row,
+ timestamp);
}
}
}
- protected abstract W createWriter(Path path) throws IOException;
-
- protected abstract void append(W writer, WAL.Entry entry) throws IOException;
-
- protected abstract void sync(W writer) throws IOException;
+ protected abstract WALProvider.Writer createWriter(Path path) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee19608b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java
new file mode 100644
index 0000000..aece961
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
+
+/**
+ * Helper class for testing protobuf log.
+ */
+final class ProtobufLogTestHelper {
+
+ private ProtobufLogTestHelper() {
+ }
+
+ private static byte[] toValue(int prefix, int suffix) {
+ return Bytes.toBytes(prefix + "-" + suffix);
+ }
+
+ private static RegionInfo toRegionInfo(TableName tableName) {
+ return RegionInfoBuilder.newBuilder(tableName).setRegionId(1024).build();
+ }
+
+ public static void doWrite(WALProvider.Writer writer, boolean withTrailer, TableName tableName,
+ int columnCount, int recordCount, byte[] row, long timestamp) throws IOException {
+ RegionInfo hri = toRegionInfo(tableName);
+ for (int i = 0; i < recordCount; i++) {
+ WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, timestamp,
+ HConstants.DEFAULT_CLUSTER_ID);
+ WALEdit edit = new WALEdit();
+ int prefix = i;
+ IntStream.range(0, columnCount).mapToObj(j -> toValue(prefix, j))
+ .map(value -> new KeyValue(row, row, row, timestamp, value)).forEachOrdered(edit::add);
+ writer.append(new WAL.Entry(key, edit));
+ }
+ writer.sync(false);
+ if (withTrailer) {
+ writer.close();
+ }
+ }
+
+ public static void doRead(ProtobufLogReader reader, boolean withTrailer, TableName tableName,
+ int columnCount, int recordCount, byte[] row, long timestamp) throws IOException {
+ if (withTrailer) {
+ assertNotNull(reader.trailer);
+ } else {
+ assertNull(reader.trailer);
+ }
+ RegionInfo hri = toRegionInfo(tableName);
+ for (int i = 0; i < recordCount; ++i) {
+ WAL.Entry entry = reader.next();
+ assertNotNull(entry);
+ assertEquals(columnCount, entry.getEdit().size());
+ assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
+ assertEquals(tableName, entry.getKey().getTableName());
+ int idx = 0;
+ for (Cell val : entry.getEdit().getCells()) {
+ assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
+ val.getRowLength()));
+ assertArrayEquals(toValue(i, idx), CellUtil.cloneValue(val));
+ idx++;
+ }
+ }
+ assertNull(reader.next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee19608b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
index 0ea75b6..7626dcf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
@@ -18,29 +18,24 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALProvider;
-import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
@Category({ RegionServerTests.class, MediumTests.class })
-public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.AsyncWriter> {
+public class TestAsyncProtobufLog extends AbstractTestProtobufLog {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@@ -64,25 +59,8 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
}
@Override
- protected AsyncWriter createWriter(Path path) throws IOException {
- return AsyncFSWALProvider.createAsyncWriter(TEST_UTIL.getConfiguration(), fs, path, false,
- EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
- }
-
- @Override
- protected void append(AsyncWriter writer, Entry entry) throws IOException {
- writer.append(entry);
- }
-
- @Override
- protected void sync(AsyncWriter writer) throws IOException {
- try {
- writer.sync().get();
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- } catch (ExecutionException e) {
- Throwables.propagateIfPossible(e.getCause());
- throw new IOException(e.getCause());
- }
+ protected Writer createWriter(Path path) throws IOException {
+ return new WriterOverAsyncWriter(AsyncFSWALProvider.createAsyncWriter(
+ TEST_UTIL.getConfiguration(), fs, path, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee19608b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
new file mode 100644
index 0000000..cb8edc6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestCombinedAsyncWriter {
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static EventLoopGroup EVENT_LOOP_GROUP;
+
+ private static Class<? extends Channel> CHANNEL_CLASS;
+
+ private static WALFactory WALS;
+
+ @Rule
+ public final TestName name = new TestName();
+
+ @Parameter
+ public CombinedAsyncWriter.Mode mode;
+
+ @Parameters(name = "{index}: mode={0}")
+ public static List<Object[]> params() {
+ return Arrays.asList(new Object[] { CombinedAsyncWriter.Mode.SEQUENTIAL },
+ new Object[] { CombinedAsyncWriter.Mode.PARALLEL });
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ EVENT_LOOP_GROUP = new NioEventLoopGroup();
+ CHANNEL_CLASS = NioSocketChannel.class;
+ UTIL.startMiniDFSCluster(3);
+ UTIL.getTestFileSystem().mkdirs(UTIL.getDataTestDirOnTestFS());
+ WALS =
+ new WALFactory(UTIL.getConfiguration(), TestCombinedAsyncWriter.class.getSimpleName());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (WALS != null) {
+ WALS.close();
+ }
+ EVENT_LOOP_GROUP.shutdownGracefully().syncUninterruptibly();
+ UTIL.shutdownMiniDFSCluster();
+ }
+
+ @Test
+ public void testWithTrailer() throws IOException {
+ doTest(true);
+ }
+
+ @Test
+ public void testWithoutTrailer() throws IOException {
+ doTest(false);
+ }
+
+ private Path getPath(int index) throws IOException {
+ String methodName = name.getMethodName().replaceAll("[^A-Za-z0-9_-]", "_");
+ return new Path(UTIL.getDataTestDirOnTestFS(), methodName + "-" + index);
+ }
+
+ private void doTest(boolean withTrailer) throws IOException {
+ int columnCount = 5;
+ int recordCount = 5;
+ TableName tableName = TableName.valueOf("tablename");
+ byte[] row = Bytes.toBytes("row");
+ long timestamp = System.currentTimeMillis();
+ Path path1 = getPath(1);
+ Path path2 = getPath(2);
+ FileSystem fs = UTIL.getTestFileSystem();
+ Configuration conf = UTIL.getConfiguration();
+ try (
+ AsyncWriter writer1 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path1, false,
+ EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
+ AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false,
+ EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
+ CombinedAsyncWriter writer = CombinedAsyncWriter.create(mode, writer1, writer2)) {
+ ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName,
+ columnCount, recordCount, row, timestamp);
+ try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path1)) {
+ ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row,
+ timestamp);
+ }
+ try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path2)) {
+ ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row,
+ timestamp);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee19608b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
index 2d938d4..d429a01 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
@@ -23,14 +23,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class })
-public class TestProtobufLog extends AbstractTestProtobufLog<WALProvider.Writer> {
+public class TestProtobufLog extends AbstractTestProtobufLog {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@@ -40,14 +38,4 @@ public class TestProtobufLog extends AbstractTestProtobufLog<WALProvider.Writer>
protected Writer createWriter(Path path) throws IOException {
return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false);
}
-
- @Override
- protected void append(Writer writer, Entry entry) throws IOException {
- writer.append(entry);
- }
-
- @Override
- protected void sync(Writer writer) throws IOException {
- writer.sync(false);
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee19608b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java
new file mode 100644
index 0000000..9d938b0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
+class WriterOverAsyncWriter implements WALProvider.Writer {
+
+ private final WALProvider.AsyncWriter asyncWriter;
+
+ public WriterOverAsyncWriter(AsyncWriter asyncWriter) {
+ this.asyncWriter = asyncWriter;
+ }
+
+ @Override
+ public void close() throws IOException {
+ asyncWriter.close();
+ }
+
+ @Override
+ public long getLength() {
+ return asyncWriter.getLength();
+ }
+
+ @Override
+ public void append(Entry entry) throws IOException {
+ asyncWriter.append(entry);
+ }
+
+ @Override
+ public void sync(boolean forceSync) throws IOException {
+ try {
+ asyncWriter.sync().get();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause(), IOException.class);
+ throw new IOException(e.getCause());
+ }
+ }
+}