You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/08/01 04:14:29 UTC
[1/3] hbase git commit: HBASE-14154 DFS Replication should be
configurable at column family level
Repository: hbase
Updated Branches:
refs/heads/0.98 9662e0e22 -> 34a1f81ed
refs/heads/branch-1 e3463d6bf -> f27479694
refs/heads/branch-1.2 92dfc8670 -> 92c7bbf31
HBASE-14154 DFS Replication should be configurable at column family level
Signed-off-by: Andrew Purtell <ap...@apache.org>
Conflicts:
hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f2747969
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f2747969
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f2747969
Branch: refs/heads/branch-1
Commit: f2747969411737f65f5522ee867cc02a74cf310e
Parents: e3463d6
Author: Ashish Singhi <as...@huawei.com>
Authored: Fri Jul 31 17:03:29 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jul 31 18:19:07 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/HColumnDescriptor.java | 31 +++++++++++
.../hadoop/hbase/TestHColumnDescriptor.java | 3 +-
.../hadoop/hbase/TestHTableDescriptor.java | 4 ++
.../hbase/io/hfile/AbstractHFileWriter.java | 2 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 8 +++
.../hbase/regionserver/HRegionFileSystem.java | 2 +-
.../org/apache/hadoop/hbase/util/FSUtils.java | 24 ++++----
.../apache/hadoop/hbase/client/TestAdmin1.java | 58 ++++++++++++++++++++
.../hadoop/hbase/client/TestFromClientSide.java | 16 ++++++
.../apache/hadoop/hbase/util/TestFSUtils.java | 6 +-
hbase-shell/src/main/ruby/hbase/admin.rb | 3 +
.../src/main/ruby/shell/commands/create.rb | 1 +
12 files changed, 140 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
index 47bafc4..ae4656f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
@@ -124,6 +124,9 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
public static final String ENCRYPTION = "ENCRYPTION";
public static final String ENCRYPTION_KEY = "ENCRYPTION_KEY";
+ public static final String DFS_REPLICATION = "DFS_REPLICATION";
+ public static final short DEFAULT_DFS_REPLICATION = 0;
+
/**
* Default compression type.
*/
@@ -1526,4 +1529,32 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
setValue(Bytes.toBytes(ENCRYPTION_KEY), keyBytes);
return this;
}
+
+ /**
+ * @return replication factor set for this CF or {@link #DEFAULT_DFS_REPLICATION} if not set.
+ * <p>
+ * {@link #DEFAULT_DFS_REPLICATION} value indicates that user has explicitly not set any
+ * block replication factor for this CF, hence use the default replication factor set in
+ * the file system.
+ */
+ public short getDFSReplication() {
+ String rf = getValue(DFS_REPLICATION);
+ return rf == null ? DEFAULT_DFS_REPLICATION : Short.valueOf(rf);
+ }
+
+ /**
+ * Set the replication factor to hfile(s) belonging to this family
+ * @param replication number of replicas the blocks(s) belonging to this CF should have, or
+ * {@link #DEFAULT_DFS_REPLICATION} for the default replication factor set in the
+ * filesystem
+ * @return this (for chained invocation)
+ */
+ public HColumnDescriptor setDFSReplication(short replication) {
+ if (replication < 1 && replication != DEFAULT_DFS_REPLICATION) {
+ throw new IllegalArgumentException(
+ "DFS replication factor cannot be less than 1 if explictly set.");
+ }
+ setValue(DFS_REPLICATION, Short.toString(replication));
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
index 8e23f97..1966253 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
@@ -53,7 +53,7 @@ public class TestHColumnDescriptor {
hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
hcd.setBloomFilterType(BloomType.ROW);
hcd.setCompressionType(Algorithm.SNAPPY);
-
+ hcd.setDFSReplication((short) v);
byte [] bytes = hcd.toByteArray();
HColumnDescriptor deserializedHcd = HColumnDescriptor.parseFrom(bytes);
@@ -69,6 +69,7 @@ public class TestHColumnDescriptor {
assertTrue(deserializedHcd.getCompressionType().equals(Compression.Algorithm.SNAPPY));
assertTrue(deserializedHcd.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF));
assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW));
+ assertEquals(v, deserializedHcd.getDFSReplication());
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
index 9bf06fb..0e580d8 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
@@ -231,12 +231,16 @@ public class TestHTableDescriptor {
byte[] familyName = Bytes.toBytes("cf");
HColumnDescriptor hcd = new HColumnDescriptor(familyName);
hcd.setBlocksize(1000);
+ hcd.setDFSReplication((short) 3);
htd.addFamily(hcd);
assertEquals(1000, htd.getFamily(familyName).getBlocksize());
+ assertEquals(3, htd.getFamily(familyName).getDFSReplication());
hcd = new HColumnDescriptor(familyName);
hcd.setBlocksize(2000);
+ hcd.setDFSReplication((short) 1);
htd.modifyFamily(hcd);
assertEquals(2000, htd.getFamily(familyName).getBlocksize());
+ assertEquals(1, htd.getFamily(familyName).getDFSReplication());
}
@Test(expected=IllegalArgumentException.class)
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
index 52491e6..93e1837 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
@@ -261,6 +261,6 @@ public abstract class AbstractHFileWriter implements HFile.Writer {
FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
FsPermission perms = FSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY);
- return FSUtils.create(fs, path, perms, favoredNodes);
+ return FSUtils.create(conf, fs, path, perms, favoredNodes);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index bb2470c..423deaf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1580,6 +1580,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
}
+ // check data replication factor, it can be 0(default value) when user has not explicitly
+ // set the value, in this case we use default replication factor set in the file system.
+ if (hcd.getDFSReplication() < 0) {
+ String message = "HFile Replication for column family " + hcd.getNameAsString()
+ + " must be greater than zero.";
+ warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
+ }
+
// TODO: should we check coprocessors and encryption ?
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index ac3e512..8a93c19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -770,7 +770,7 @@ public class HRegionFileSystem {
// First check to get the permissions
FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
// Write the RegionInfo file content
- FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms, null);
+ FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null);
try {
out.write(content);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 5540569..4c54d27 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -362,11 +362,12 @@ public abstract class FSUtils {
* <li>overwrite the file if it exists</li>
* <li>apply the umask in the configuration (if it is enabled)</li>
* <li>use the fs configured buffer size (or 4096 if not set)</li>
- * <li>use the default replication</li>
+ * <li>use the configured column family replication or default replication if
+ * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
* <li>use the default block size</li>
* <li>not track progress</li>
* </ol>
- *
+ * @param conf configurations
* @param fs {@link FileSystem} on which to write the file
* @param path {@link Path} to the file to write
* @param perm permissions
@@ -374,23 +375,22 @@ public abstract class FSUtils {
* @return output stream to the created file
* @throws IOException if the file cannot be created
*/
- public static FSDataOutputStream create(FileSystem fs, Path path,
+ public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
if (fs instanceof HFileSystem) {
FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
if (backingFs instanceof DistributedFileSystem) {
// Try to use the favoredNodes version via reflection to allow backwards-
// compatibility.
+ short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
+ String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
try {
- return (FSDataOutputStream) (DistributedFileSystem.class
- .getDeclaredMethod("create", Path.class, FsPermission.class,
- boolean.class, int.class, short.class, long.class,
- Progressable.class, InetSocketAddress[].class)
- .invoke(backingFs, path, perm, true,
- getDefaultBufferSize(backingFs),
- getDefaultReplication(backingFs, path),
- getDefaultBlockSize(backingFs, path),
- null, favoredNodes));
+ return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
+ Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
+ Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
+ getDefaultBufferSize(backingFs),
+ replication > 0 ? replication : getDefaultReplication(backingFs, path),
+ getDefaultBlockSize(backingFs, path), null, favoredNodes));
} catch (InvocationTargetException ite) {
// Function was properly called, but threw it's own exception.
throw new IOException(ite.getCause());
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
index a2d8690..a365220 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
@@ -61,6 +62,8 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
@@ -1349,4 +1352,59 @@ public class TestAdmin1 {
this.admin.deleteTable(tableName);
}
+
+ /*
+ * Test DFS replication for column families, where one CF has default replication(3) and the other
+ * is set to 1.
+ */
+ @Test(timeout = 300000)
+ public void testHFileReplication() throws Exception {
+ TableName name = TableName.valueOf("testHFileReplication");
+ String fn1 = "rep1";
+ HColumnDescriptor hcd1 = new HColumnDescriptor(fn1);
+ hcd1.setDFSReplication((short) 1);
+ String fn = "defaultRep";
+ HColumnDescriptor hcd = new HColumnDescriptor(fn);
+ HTableDescriptor htd = new HTableDescriptor(name);
+ htd.addFamily(hcd);
+ htd.addFamily(hcd1);
+ Table table = TEST_UTIL.createTable(htd, null);
+ TEST_UTIL.waitTableAvailable(name);
+ Put p = new Put(Bytes.toBytes("defaultRep_rk"));
+ byte[] q1 = Bytes.toBytes("q1");
+ byte[] v1 = Bytes.toBytes("v1");
+ p.addColumn(Bytes.toBytes(fn), q1, v1);
+ List<Put> puts = new ArrayList<Put>(2);
+ puts.add(p);
+ p = new Put(Bytes.toBytes("rep1_rk"));
+ p.addColumn(Bytes.toBytes(fn1), q1, v1);
+ puts.add(p);
+ try {
+ table.put(puts);
+ admin.flush(name);
+
+ List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegions(name);
+ for (HRegion r : regions) {
+ Store store = r.getStore(Bytes.toBytes(fn));
+ for (StoreFile sf : store.getStorefiles()) {
+ assertTrue(sf.toString().contains(fn));
+ assertTrue("Column family " + fn + " should have 3 copies",
+ FSUtils.getDefaultReplication(TEST_UTIL.getTestFileSystem(), sf.getPath()) == (sf
+ .getFileInfo().getFileStatus().getReplication()));
+ }
+
+ store = r.getStore(Bytes.toBytes(fn1));
+ for (StoreFile sf : store.getStorefiles()) {
+ assertTrue(sf.toString().contains(fn1));
+ assertTrue("Column family " + fn1 + " should have only 1 copy", 1 == sf.getFileInfo()
+ .getFileStatus().getReplication());
+ }
+ }
+ } finally {
+ if (admin.isTableEnabled(name)) {
+ this.admin.disableTable(name);
+ this.admin.deleteTable(name);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 0b793b4..81253a5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -5581,6 +5581,22 @@ public class TestFromClientSide {
hcd.setScope(0);
checkTableIsLegal(htd);
+ try {
+ hcd.setDFSReplication((short) -1);
+ fail("Illegal value for setDFSReplication did not throw");
+ } catch (IllegalArgumentException e) {
+ // pass
+ }
+ // set an illegal DFS replication value by hand
+ hcd.setValue(HColumnDescriptor.DFS_REPLICATION, "-1");
+ checkTableIsIllegal(htd);
+ try {
+ hcd.setDFSReplication((short) -1);
+ fail("Should throw exception if an illegal value is explicitly being set");
+ } catch (IllegalArgumentException e) {
+ // pass
+ }
+
// check the conf settings to disable sanity checks
htd.setMemStoreFlushSize(0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index c501477..2699292 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -259,7 +259,7 @@ public class TestFSUtils {
// then that the correct file is created
Path p = new Path("target" + File.separator + UUID.randomUUID().toString());
try {
- FSDataOutputStream out = FSUtils.create(fs, p, filePerm, null);
+ FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
out.close();
FileStatus stat = fs.getFileStatus(p);
assertEquals(new FsPermission("700"), stat.getPermission());
@@ -281,13 +281,13 @@ public class TestFSUtils {
Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
try {
- FSDataOutputStream out = FSUtils.create(fs, p, perms, null);
+ FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
out.close();
assertTrue("The created file should be present", FSUtils.isExists(fs, p));
// delete the file with recursion as false. Only the file will be deleted.
FSUtils.delete(fs, p, false);
// Create another file
- FSDataOutputStream out1 = FSUtils.create(fs, p1, perms, null);
+ FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
out1.close();
// delete the file with recursion as false. Still the file only will be deleted
FSUtils.delete(fs, p1, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index 84cf619..451f924 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -788,6 +788,9 @@ module Hbase
set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
+ family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.
+ HColumnDescriptor::DFS_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.
+ HColumnDescriptor::DFS_REPLICATION)
arg.each_key do |unknown_key|
puts("Unknown argument ignored for column family %s: %s" % [name, unknown_key])
http://git-wip-us.apache.org/repos/asf/hbase/blob/f2747969/hbase-shell/src/main/ruby/shell/commands/create.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/create.rb b/hbase-shell/src/main/ruby/shell/commands/create.rb
index ab3a3d1..a5a125e 100644
--- a/hbase-shell/src/main/ruby/shell/commands/create.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/create.rb
@@ -50,6 +50,7 @@ Examples:
hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', REGION_REPLICATION => 2, CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}}
+ hbase> create 't1', {NAME => 'f1', DFS_REPLICATION => 1}
You can also keep around a reference to the created table:
[2/3] hbase git commit: HBASE-14154 DFS Replication should be
configurable at column family level
Posted by ap...@apache.org.
HBASE-14154 DFS Replication should be configurable at column family level
Signed-off-by: Andrew Purtell <ap...@apache.org>
Conflicts:
hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/92c7bbf3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/92c7bbf3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/92c7bbf3
Branch: refs/heads/branch-1.2
Commit: 92c7bbf310fce99cc0f37806a8b63ec751d31f6f
Parents: 92dfc86
Author: Ashish Singhi <as...@huawei.com>
Authored: Fri Jul 31 17:03:29 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jul 31 18:19:18 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/HColumnDescriptor.java | 31 +++++++++++
.../hadoop/hbase/TestHColumnDescriptor.java | 3 +-
.../hadoop/hbase/TestHTableDescriptor.java | 4 ++
.../hbase/io/hfile/AbstractHFileWriter.java | 2 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 8 +++
.../hbase/regionserver/HRegionFileSystem.java | 2 +-
.../org/apache/hadoop/hbase/util/FSUtils.java | 24 ++++----
.../apache/hadoop/hbase/client/TestAdmin1.java | 58 ++++++++++++++++++++
.../hadoop/hbase/client/TestFromClientSide.java | 16 ++++++
.../apache/hadoop/hbase/util/TestFSUtils.java | 6 +-
hbase-shell/src/main/ruby/hbase/admin.rb | 3 +
.../src/main/ruby/shell/commands/create.rb | 1 +
12 files changed, 140 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
index 47bafc4..ae4656f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
@@ -124,6 +124,9 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
public static final String ENCRYPTION = "ENCRYPTION";
public static final String ENCRYPTION_KEY = "ENCRYPTION_KEY";
+ public static final String DFS_REPLICATION = "DFS_REPLICATION";
+ public static final short DEFAULT_DFS_REPLICATION = 0;
+
/**
* Default compression type.
*/
@@ -1526,4 +1529,32 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
setValue(Bytes.toBytes(ENCRYPTION_KEY), keyBytes);
return this;
}
+
+ /**
+ * @return replication factor set for this CF or {@link #DEFAULT_DFS_REPLICATION} if not set.
+ * <p>
+ * {@link #DEFAULT_DFS_REPLICATION} value indicates that user has explicitly not set any
+ * block replication factor for this CF, hence use the default replication factor set in
+ * the file system.
+ */
+ public short getDFSReplication() {
+ String rf = getValue(DFS_REPLICATION);
+ return rf == null ? DEFAULT_DFS_REPLICATION : Short.valueOf(rf);
+ }
+
+ /**
+ * Set the replication factor to hfile(s) belonging to this family
+ * @param replication number of replicas the blocks(s) belonging to this CF should have, or
+ * {@link #DEFAULT_DFS_REPLICATION} for the default replication factor set in the
+ * filesystem
+ * @return this (for chained invocation)
+ */
+ public HColumnDescriptor setDFSReplication(short replication) {
+ if (replication < 1 && replication != DEFAULT_DFS_REPLICATION) {
+ throw new IllegalArgumentException(
+ "DFS replication factor cannot be less than 1 if explictly set.");
+ }
+ setValue(DFS_REPLICATION, Short.toString(replication));
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
index 8e23f97..1966253 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
@@ -53,7 +53,7 @@ public class TestHColumnDescriptor {
hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
hcd.setBloomFilterType(BloomType.ROW);
hcd.setCompressionType(Algorithm.SNAPPY);
-
+ hcd.setDFSReplication((short) v);
byte [] bytes = hcd.toByteArray();
HColumnDescriptor deserializedHcd = HColumnDescriptor.parseFrom(bytes);
@@ -69,6 +69,7 @@ public class TestHColumnDescriptor {
assertTrue(deserializedHcd.getCompressionType().equals(Compression.Algorithm.SNAPPY));
assertTrue(deserializedHcd.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF));
assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW));
+ assertEquals(v, deserializedHcd.getDFSReplication());
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
index 9bf06fb..0e580d8 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
@@ -231,12 +231,16 @@ public class TestHTableDescriptor {
byte[] familyName = Bytes.toBytes("cf");
HColumnDescriptor hcd = new HColumnDescriptor(familyName);
hcd.setBlocksize(1000);
+ hcd.setDFSReplication((short) 3);
htd.addFamily(hcd);
assertEquals(1000, htd.getFamily(familyName).getBlocksize());
+ assertEquals(3, htd.getFamily(familyName).getDFSReplication());
hcd = new HColumnDescriptor(familyName);
hcd.setBlocksize(2000);
+ hcd.setDFSReplication((short) 1);
htd.modifyFamily(hcd);
assertEquals(2000, htd.getFamily(familyName).getBlocksize());
+ assertEquals(1, htd.getFamily(familyName).getDFSReplication());
}
@Test(expected=IllegalArgumentException.class)
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
index 52491e6..93e1837 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
@@ -261,6 +261,6 @@ public abstract class AbstractHFileWriter implements HFile.Writer {
FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
FsPermission perms = FSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY);
- return FSUtils.create(fs, path, perms, favoredNodes);
+ return FSUtils.create(conf, fs, path, perms, favoredNodes);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index bb2470c..423deaf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1580,6 +1580,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
}
+ // check data replication factor, it can be 0(default value) when user has not explicitly
+ // set the value, in this case we use default replication factor set in the file system.
+ if (hcd.getDFSReplication() < 0) {
+ String message = "HFile Replication for column family " + hcd.getNameAsString()
+ + " must be greater than zero.";
+ warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
+ }
+
// TODO: should we check coprocessors and encryption ?
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index ac3e512..8a93c19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -770,7 +770,7 @@ public class HRegionFileSystem {
// First check to get the permissions
FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
// Write the RegionInfo file content
- FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms, null);
+ FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null);
try {
out.write(content);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 5540569..4c54d27 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -362,11 +362,12 @@ public abstract class FSUtils {
* <li>overwrite the file if it exists</li>
* <li>apply the umask in the configuration (if it is enabled)</li>
* <li>use the fs configured buffer size (or 4096 if not set)</li>
- * <li>use the default replication</li>
+ * <li>use the configured column family replication or default replication if
+ * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
* <li>use the default block size</li>
* <li>not track progress</li>
* </ol>
- *
+ * @param conf configurations
* @param fs {@link FileSystem} on which to write the file
* @param path {@link Path} to the file to write
* @param perm permissions
@@ -374,23 +375,22 @@ public abstract class FSUtils {
* @return output stream to the created file
* @throws IOException if the file cannot be created
*/
- public static FSDataOutputStream create(FileSystem fs, Path path,
+ public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
if (fs instanceof HFileSystem) {
FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
if (backingFs instanceof DistributedFileSystem) {
// Try to use the favoredNodes version via reflection to allow backwards-
// compatibility.
+ short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
+ String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
try {
- return (FSDataOutputStream) (DistributedFileSystem.class
- .getDeclaredMethod("create", Path.class, FsPermission.class,
- boolean.class, int.class, short.class, long.class,
- Progressable.class, InetSocketAddress[].class)
- .invoke(backingFs, path, perm, true,
- getDefaultBufferSize(backingFs),
- getDefaultReplication(backingFs, path),
- getDefaultBlockSize(backingFs, path),
- null, favoredNodes));
+ return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
+ Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
+ Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
+ getDefaultBufferSize(backingFs),
+ replication > 0 ? replication : getDefaultReplication(backingFs, path),
+ getDefaultBlockSize(backingFs, path), null, favoredNodes));
} catch (InvocationTargetException ite) {
// Function was properly called, but threw it's own exception.
throw new IOException(ite.getCause());
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
index a2d8690..a365220 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
@@ -61,6 +62,8 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
@@ -1349,4 +1352,59 @@ public class TestAdmin1 {
this.admin.deleteTable(tableName);
}
+
+ /*
+ * Test DFS replication for column families, where one CF has default replication(3) and the other
+ * is set to 1.
+ */
+ @Test(timeout = 300000)
+ public void testHFileReplication() throws Exception {
+ TableName name = TableName.valueOf("testHFileReplication");
+ String fn1 = "rep1";
+ HColumnDescriptor hcd1 = new HColumnDescriptor(fn1);
+ hcd1.setDFSReplication((short) 1);
+ String fn = "defaultRep";
+ HColumnDescriptor hcd = new HColumnDescriptor(fn);
+ HTableDescriptor htd = new HTableDescriptor(name);
+ htd.addFamily(hcd);
+ htd.addFamily(hcd1);
+ Table table = TEST_UTIL.createTable(htd, null);
+ TEST_UTIL.waitTableAvailable(name);
+ Put p = new Put(Bytes.toBytes("defaultRep_rk"));
+ byte[] q1 = Bytes.toBytes("q1");
+ byte[] v1 = Bytes.toBytes("v1");
+ p.addColumn(Bytes.toBytes(fn), q1, v1);
+ List<Put> puts = new ArrayList<Put>(2);
+ puts.add(p);
+ p = new Put(Bytes.toBytes("rep1_rk"));
+ p.addColumn(Bytes.toBytes(fn1), q1, v1);
+ puts.add(p);
+ try {
+ table.put(puts);
+ admin.flush(name);
+
+ List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegions(name);
+ for (HRegion r : regions) {
+ Store store = r.getStore(Bytes.toBytes(fn));
+ for (StoreFile sf : store.getStorefiles()) {
+ assertTrue(sf.toString().contains(fn));
+ assertTrue("Column family " + fn + " should have 3 copies",
+ FSUtils.getDefaultReplication(TEST_UTIL.getTestFileSystem(), sf.getPath()) == (sf
+ .getFileInfo().getFileStatus().getReplication()));
+ }
+
+ store = r.getStore(Bytes.toBytes(fn1));
+ for (StoreFile sf : store.getStorefiles()) {
+ assertTrue(sf.toString().contains(fn1));
+ assertTrue("Column family " + fn1 + " should have only 1 copy", 1 == sf.getFileInfo()
+ .getFileStatus().getReplication());
+ }
+ }
+ } finally {
+ if (admin.isTableEnabled(name)) {
+ this.admin.disableTable(name);
+ this.admin.deleteTable(name);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 0b793b4..81253a5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -5581,6 +5581,22 @@ public class TestFromClientSide {
hcd.setScope(0);
checkTableIsLegal(htd);
+ try {
+ hcd.setDFSReplication((short) -1);
+ fail("Illegal value for setDFSReplication did not throw");
+ } catch (IllegalArgumentException e) {
+ // pass
+ }
+ // set an illegal DFS replication value by hand
+ hcd.setValue(HColumnDescriptor.DFS_REPLICATION, "-1");
+ checkTableIsIllegal(htd);
+ try {
+ hcd.setDFSReplication((short) -1);
+ fail("Should throw exception if an illegal value is explicitly being set");
+ } catch (IllegalArgumentException e) {
+ // pass
+ }
+
// check the conf settings to disable sanity checks
htd.setMemStoreFlushSize(0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index c501477..2699292 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -259,7 +259,7 @@ public class TestFSUtils {
// then that the correct file is created
Path p = new Path("target" + File.separator + UUID.randomUUID().toString());
try {
- FSDataOutputStream out = FSUtils.create(fs, p, filePerm, null);
+ FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
out.close();
FileStatus stat = fs.getFileStatus(p);
assertEquals(new FsPermission("700"), stat.getPermission());
@@ -281,13 +281,13 @@ public class TestFSUtils {
Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
try {
- FSDataOutputStream out = FSUtils.create(fs, p, perms, null);
+ FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
out.close();
assertTrue("The created file should be present", FSUtils.isExists(fs, p));
// delete the file with recursion as false. Only the file will be deleted.
FSUtils.delete(fs, p, false);
// Create another file
- FSDataOutputStream out1 = FSUtils.create(fs, p1, perms, null);
+ FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
out1.close();
// delete the file with recursion as false. Still the file only will be deleted
FSUtils.delete(fs, p1, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index e10e2be..94bc684 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -788,6 +788,9 @@ module Hbase
set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
+ family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.
+ HColumnDescriptor::DFS_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.
+ HColumnDescriptor::DFS_REPLICATION)
arg.each_key do |unknown_key|
puts("Unknown argument ignored for column family %s: %s" % [name, unknown_key])
http://git-wip-us.apache.org/repos/asf/hbase/blob/92c7bbf3/hbase-shell/src/main/ruby/shell/commands/create.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/create.rb b/hbase-shell/src/main/ruby/shell/commands/create.rb
index ab3a3d1..a5a125e 100644
--- a/hbase-shell/src/main/ruby/shell/commands/create.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/create.rb
@@ -50,6 +50,7 @@ Examples:
hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', REGION_REPLICATION => 2, CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}}
+ hbase> create 't1', {NAME => 'f1', DFS_REPLICATION => 1}
You can also keep around a reference to the created table:
[3/3] hbase git commit: HBASE-14154 DFS Replication should be
configurable at column family level
Posted by ap...@apache.org.
HBASE-14154 DFS Replication should be configurable at column family level
Signed-off-by: Andrew Purtell <ap...@apache.org>
Conflicts:
hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
hbase-shell/src/main/ruby/shell/commands/create.rb
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/34a1f81e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/34a1f81e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/34a1f81e
Branch: refs/heads/0.98
Commit: 34a1f81eda9c944d519bbae2bf3df84c3adeb36c
Parents: 9662e0e
Author: Ashish Singhi <as...@huawei.com>
Authored: Fri Jul 31 17:03:29 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jul 31 18:52:30 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/HColumnDescriptor.java | 31 ++++++++++
.../hbase/io/hfile/AbstractHFileWriter.java | 2 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 8 +++
.../hbase/regionserver/HRegionFileSystem.java | 2 +-
.../org/apache/hadoop/hbase/util/FSUtils.java | 24 ++++----
.../hadoop/hbase/TestHColumnDescriptor.java | 3 +-
.../apache/hadoop/hbase/client/TestAdmin1.java | 59 ++++++++++++++++++++
.../hadoop/hbase/client/TestFromClientSide.java | 16 ++++++
.../apache/hadoop/hbase/util/TestFSUtils.java | 6 +-
hbase-shell/src/main/ruby/hbase/admin.rb | 3 +
.../src/main/ruby/shell/commands/create.rb | 1 +
11 files changed, 137 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
index e4fe896..4fde519 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
@@ -112,6 +112,9 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
@InterfaceStability.Unstable
public static final String ENCRYPTION_KEY = "ENCRYPTION_KEY";
+ public static final String DFS_REPLICATION = "DFS_REPLICATION";
+ public static final short DEFAULT_DFS_REPLICATION = 0;
+
/**
* Default compression type.
*/
@@ -1392,4 +1395,32 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
setValue(Bytes.toBytes(ENCRYPTION_KEY), keyBytes);
return this;
}
+
+ /**
+ * @return replication factor set for this CF or {@link #DEFAULT_DFS_REPLICATION} if not set.
+ * <p>
+ * {@link #DEFAULT_DFS_REPLICATION} value indicates that user has explicitly not set any
+ * block replication factor for this CF, hence use the default replication factor set in
+ * the file system.
+ */
+ public short getDFSReplication() {
+ String rf = getValue(DFS_REPLICATION);
+ return rf == null ? DEFAULT_DFS_REPLICATION : Short.valueOf(rf);
+ }
+
+ /**
+ * Set the replication factor to hfile(s) belonging to this family
+ * @param replication number of replicas the blocks(s) belonging to this CF should have, or
+ * {@link #DEFAULT_DFS_REPLICATION} for the default replication factor set in the
+ * filesystem
+ * @return this (for chained invocation)
+ */
+ public HColumnDescriptor setDFSReplication(short replication) {
+ if (replication < 1 && replication != DEFAULT_DFS_REPLICATION) {
+ throw new IllegalArgumentException(
+ "DFS replication factor cannot be less than 1 if explictly set.");
+ }
+ setValue(DFS_REPLICATION, Short.toString(replication));
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
index b1f7038..8ccd9e3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
@@ -263,6 +263,6 @@ public abstract class AbstractHFileWriter implements HFile.Writer {
FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
FsPermission perms = FSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY);
- return FSUtils.create(fs, path, perms, favoredNodes);
+ return FSUtils.create(conf, fs, path, perms, favoredNodes);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 6d21573..2ff99a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1967,6 +1967,14 @@ MasterServices, Server {
warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
}
+ // check data replication factor, it can be 0(default value) when user has not explicitly
+ // set the value, in this case we use default replication factor set in the file system.
+ if (hcd.getDFSReplication() < 0) {
+ String message = "HFile Replication for column family " + hcd.getNameAsString()
+ + " must be greater than zero.";
+ warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
+ }
+
// TODO: should we check coprocessors and encryption ?
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index f743a01..d2d34d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -755,7 +755,7 @@ public class HRegionFileSystem {
// First check to get the permissions
FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
// Write the RegionInfo file content
- FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms, null);
+ FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null);
try {
out.write(content);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 57eeb7c..8ffb9a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -286,11 +286,12 @@ public abstract class FSUtils {
* <li>overwrite the file if it exists</li>
* <li>apply the umask in the configuration (if it is enabled)</li>
* <li>use the fs configured buffer size (or 4096 if not set)</li>
- * <li>use the default replication</li>
+ * <li>use the configured column family replication or default replication if
+ * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
* <li>use the default block size</li>
* <li>not track progress</li>
* </ol>
- *
+ * @param conf configurations
* @param fs {@link FileSystem} on which to write the file
* @param path {@link Path} to the file to write
* @param perm permissions
@@ -298,23 +299,22 @@ public abstract class FSUtils {
* @return output stream to the created file
* @throws IOException if the file cannot be created
*/
- public static FSDataOutputStream create(FileSystem fs, Path path,
+ public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
if (fs instanceof HFileSystem) {
FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
if (backingFs instanceof DistributedFileSystem) {
// Try to use the favoredNodes version via reflection to allow backwards-
// compatibility.
+ short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
+ String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
try {
- return (FSDataOutputStream) (DistributedFileSystem.class
- .getDeclaredMethod("create", Path.class, FsPermission.class,
- boolean.class, int.class, short.class, long.class,
- Progressable.class, InetSocketAddress[].class)
- .invoke(backingFs, path, perm, true,
- getDefaultBufferSize(backingFs),
- getDefaultReplication(backingFs, path),
- getDefaultBlockSize(backingFs, path),
- null, favoredNodes));
+ return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
+ Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
+ Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
+ getDefaultBufferSize(backingFs),
+ replication > 0 ? replication : getDefaultReplication(backingFs, path),
+ getDefaultBlockSize(backingFs, path), null, favoredNodes));
} catch (InvocationTargetException ite) {
// Function was properly called, but threw it's own exception.
throw new IOException(ite.getCause());
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
index af9c519..c2f65aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
@@ -53,7 +53,7 @@ public class TestHColumnDescriptor {
hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
hcd.setBloomFilterType(BloomType.ROW);
hcd.setCompressionType(Algorithm.SNAPPY);
-
+ hcd.setDFSReplication((short) v);
byte [] bytes = hcd.toByteArray();
HColumnDescriptor deserializedHcd = HColumnDescriptor.parseFrom(bytes);
@@ -69,6 +69,7 @@ public class TestHColumnDescriptor {
assertTrue(deserializedHcd.getCompressionType().equals(Compression.Algorithm.SNAPPY));
assertTrue(deserializedHcd.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF));
assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW));
+ assertEquals(v, deserializedHcd.getDFSReplication());
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
index efdc777..936b68f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java
@@ -43,6 +43,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -52,6 +55,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -1194,4 +1198,59 @@ public class TestAdmin1 {
this.admin.deleteTable(tableName);
}
+
+ /*
+ * Test DFS replication for column families, where one CF has default replication(3) and the other
+ * is set to 1.
+ */
+ @Test(timeout = 300000)
+ public void testHFileReplication() throws Exception {
+ TableName name = TableName.valueOf("testHFileReplication");
+ String fn1 = "rep1";
+ HColumnDescriptor hcd1 = new HColumnDescriptor(fn1);
+ hcd1.setDFSReplication((short) 1);
+ String fn = "defaultRep";
+ HColumnDescriptor hcd = new HColumnDescriptor(fn);
+ HTableDescriptor htd = new HTableDescriptor(name);
+ htd.addFamily(hcd);
+ htd.addFamily(hcd1);
+ HTable table = TEST_UTIL.createTable(htd, null);
+ TEST_UTIL.waitTableAvailable(name.getName());
+ Put p = new Put(Bytes.toBytes("defaultRep_rk"));
+ byte[] q1 = Bytes.toBytes("q1");
+ byte[] v1 = Bytes.toBytes("v1");
+ p.add(Bytes.toBytes(fn), q1, v1);
+ List<Put> puts = new ArrayList<Put>(2);
+ puts.add(p);
+ p = new Put(Bytes.toBytes("rep1_rk"));
+ p.add(Bytes.toBytes(fn1), q1, v1);
+ puts.add(p);
+ try {
+ table.put(puts);
+ admin.flush(name.getNameAsString());
+
+ List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegions(name);
+ for (HRegion r : regions) {
+ Store store = r.getStore(Bytes.toBytes(fn));
+ for (StoreFile sf : store.getStorefiles()) {
+ assertTrue(sf.toString().contains(fn));
+ assertTrue("Column family " + fn + " should have 3 copies",
+ FSUtils.getDefaultReplication(TEST_UTIL.getTestFileSystem(), sf.getPath()) == (sf
+ .getFileInfo().getFileStatus().getReplication()));
+ }
+
+ store = r.getStore(Bytes.toBytes(fn1));
+ for (StoreFile sf : store.getStorefiles()) {
+ assertTrue(sf.toString().contains(fn1));
+ assertTrue("Column family " + fn1 + " should have only 1 copy", 1 == sf.getFileInfo()
+ .getFileStatus().getReplication());
+ }
+ }
+ } finally {
+ if (admin.isTableEnabled(name)) {
+ this.admin.disableTable(name);
+ this.admin.deleteTable(name);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 7eec037..27b89c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -5521,6 +5521,22 @@ public class TestFromClientSide {
hcd.setScope(0);
checkTableIsLegal(htd);
+ try {
+ hcd.setDFSReplication((short) -1);
+ fail("Illegal value for setDFSReplication did not throw");
+ } catch (IllegalArgumentException e) {
+ // pass
+ }
+ // set an illegal DFS replication value by hand
+ hcd.setValue(HColumnDescriptor.DFS_REPLICATION, "-1");
+ checkTableIsIllegal(htd);
+ try {
+ hcd.setDFSReplication((short) -1);
+ fail("Should throw exception if an illegal value is explicitly being set");
+ } catch (IllegalArgumentException e) {
+ // pass
+ }
+
// check the conf settings to disable sanity checks
htd.setMemStoreFlushSize(0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 8a6be4b..2b5b536 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -259,7 +259,7 @@ public class TestFSUtils {
// then that the correct file is created
Path p = new Path("target" + File.separator + UUID.randomUUID().toString());
try {
- FSDataOutputStream out = FSUtils.create(fs, p, filePerm, null);
+ FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
out.close();
FileStatus stat = fs.getFileStatus(p);
assertEquals(new FsPermission("700"), stat.getPermission());
@@ -281,13 +281,13 @@ public class TestFSUtils {
Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
try {
- FSDataOutputStream out = FSUtils.create(fs, p, perms, null);
+ FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
out.close();
assertTrue("The created file should be present", FSUtils.isExists(fs, p));
// delete the file with recursion as false. Only the file will be deleted.
FSUtils.delete(fs, p, false);
// Create another file
- FSDataOutputStream out1 = FSUtils.create(fs, p1, perms, null);
+ FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
out1.close();
// delete the file with recursion as false. Still the file only will be deleted
FSUtils.delete(fs, p1, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index 151a566..00708e9 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -766,6 +766,9 @@ module Hbase
set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
+ family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.
+ HColumnDescriptor::DFS_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.
+ HColumnDescriptor::DFS_REPLICATION)
arg.each_key do |unknown_key|
puts("Unknown argument ignored for column family %s: %s" % [name, unknown_key])
http://git-wip-us.apache.org/repos/asf/hbase/blob/34a1f81e/hbase-shell/src/main/ruby/shell/commands/create.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/create.rb b/hbase-shell/src/main/ruby/shell/commands/create.rb
index cd0fdea..9739bea 100644
--- a/hbase-shell/src/main/ruby/shell/commands/create.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/create.rb
@@ -50,6 +50,7 @@ Examples:
hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}}
+ hbase> create 't1', {NAME => 'f1', DFS_REPLICATION => 1}
You can also keep around a reference to the created table: