You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/05/15 20:31:50 UTC

[2/2] hbase git commit: HBASE-13035 Backport HBASE-12867 Shell does not support custom replication endpoint specification

HBASE-13035 Backport HBASE-12867 Shell does not support custom replication endpoint specification


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/31fb0583
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/31fb0583
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/31fb0583

Branch: refs/heads/branch-1.0
Commit: 31fb058394f70f1742827ce3bdc97695fb3eae81
Parents: e36876b
Author: Andrew Purtell <ap...@apache.org>
Authored: Thu May 14 12:37:42 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri May 15 11:04:15 2015 -0700

----------------------------------------------------------------------
 hbase-shell/src/main/ruby/hbase.rb              |   5 +
 .../src/main/ruby/hbase/replication_admin.rb    |  69 ++++++-
 .../src/main/ruby/shell/commands/add_peer.rb    |  38 +++-
 .../test/ruby/hbase/replication_admin_test.rb   | 191 +++++++++++++++++++
 hbase-shell/src/test/ruby/test_helper.rb        |   4 +
 5 files changed, 296 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/31fb0583/hbase-shell/src/main/ruby/hbase.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase.rb b/hbase-shell/src/main/ruby/hbase.rb
index 121fd53..304afaf 100644
--- a/hbase-shell/src/main/ruby/hbase.rb
+++ b/hbase-shell/src/main/ruby/hbase.rb
@@ -65,6 +65,11 @@ module HBaseConstants
   AUTHORIZATIONS = "AUTHORIZATIONS"
   SKIP_FLUSH = 'SKIP_FLUSH'
   CONSISTENCY = "CONSISTENCY"
+  ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME'
+  CLUSTER_KEY = 'CLUSTER_KEY'
+  TABLE_CFS = 'TABLE_CFS'
+  CONFIG = 'CONFIG'
+  DATA = 'DATA'
 
   # Load constants from hbase java API
   def self.promote_constants(constants)

http://git-wip-us.apache.org/repos/asf/hbase/blob/31fb0583/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 6dedb2e..2d0845f 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -19,21 +19,80 @@
 
 include Java
 
-# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
+java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin
+java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig
+java_import org.apache.hadoop.hbase.util.Bytes
+java_import org.apache.hadoop.hbase.zookeeper.ZKUtil
+
+# Wrapper for org.apache.hadoop.hbase.client.replication.ReplicationAdmin
 
 module Hbase
   class RepAdmin
     include HBaseConstants
 
     def initialize(configuration, formatter)
-      @replication_admin = org.apache.hadoop.hbase.client.replication.ReplicationAdmin.new(configuration)
+      @replication_admin = ReplicationAdmin.new(configuration)
+      @configuration = configuration
       @formatter = formatter
     end
 
     #----------------------------------------------------------------------------------------------
     # Add a new peer cluster to replicate to
-    def add_peer(id, cluster_key, peer_tableCFs = nil)
-      @replication_admin.addPeer(id, cluster_key, peer_tableCFs)
+    def add_peer(id, args = {}, peer_tableCFs = nil)
+      # make add_peer backwards compatible to take in string for clusterKey and peer_tableCFs
+      if args.is_a?(String)
+        cluster_key = args
+        @replication_admin.addPeer(id, cluster_key, peer_tableCFs)
+      elsif args.is_a?(Hash)
+        unless peer_tableCFs.nil?
+          raise(ArgumentError, "peer_tableCFs should be specified as TABLE_CFS in args")
+        end
+
+        endpoint_classname = args.fetch(ENDPOINT_CLASSNAME, nil)
+        cluster_key = args.fetch(CLUSTER_KEY, nil)
+
+        # Handle cases where custom replication endpoint and cluster key are either both provided
+        # or neither are provided
+        if endpoint_classname.nil? and cluster_key.nil?
+          raise(ArgumentError, "Either ENDPOINT_CLASSNAME or CLUSTER_KEY must be specified.")
+        elsif !endpoint_classname.nil? and !cluster_key.nil?
+          raise(ArgumentError, "ENDPOINT_CLASSNAME and CLUSTER_KEY cannot both be specified.")
+        end
+
+        # Cluster Key is required for ReplicationPeerConfig for a custom replication endpoint
+        if !endpoint_classname.nil? and cluster_key.nil?
+          cluster_key = ZKUtil.getZooKeeperClusterKey(@configuration)
+        end
+
+        # Optional parameters
+        config = args.fetch(CONFIG, nil)
+        data = args.fetch(DATA, nil)
+        table_cfs = args.fetch(TABLE_CFS, nil)
+
+        # Create and populate a ReplicationPeerConfig
+        replication_peer_config = ReplicationPeerConfig.new
+        replication_peer_config.set_cluster_key(cluster_key)
+
+        unless endpoint_classname.nil?
+          replication_peer_config.set_replication_endpoint_impl(endpoint_classname)
+        end
+
+        unless config.nil?
+          replication_peer_config.get_configuration.put_all(config)
+        end
+
+        unless data.nil?
+          # Convert Strings to Bytes for peer_data
+          peer_data = replication_peer_config.get_peer_data
+          data.each{|key, val|
+            peer_data.put(Bytes.to_bytes(key), Bytes.to_bytes(val))
+          }
+        end
+
+        @replication_admin.add_peer(id, replication_peer_config, table_cfs)
+      else
+        raise(ArgumentError, "args must be either a String or Hash")
+      end
     end
 
     #----------------------------------------------------------------------------------------------
@@ -48,7 +107,7 @@ module Hbase
     def list_replicated_tables(regex = ".*")
       pattern = java.util.regex.Pattern.compile(regex)
       list = @replication_admin.listReplicated()
-      list.select {|s| pattern.match(s.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::TNAME))}
+      list.select {|s| pattern.match(s.get(ReplicationAdmin.TNAME))}
     end
 
     #----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hbase/blob/31fb0583/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
index ecd8e75..be01041 100644
--- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
@@ -22,21 +22,47 @@ module Shell
     class AddPeer< Command
       def help
         return <<-EOF
-Add a peer cluster to replicate to, the id must be a short and
-the cluster key is composed like this:
+A peer can either be another HBase cluster or a custom replication endpoint. In either case an id
+must be specified to identify the peer.
+
+For a HBase cluster peer, a cluster key must be provided and is composed like this:
 hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
-This gives a full path for HBase to connect to another cluster.
+This gives a full path for HBase to connect to another HBase cluster. An optional parameter for
+table column families identifies which column families will be replicated to the peer cluster.
 Examples:
 
   hbase> add_peer '1', "server1.cie.com:2181:/hbase"
   hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod"
-  hbase> add_peer '3', "zk4,zk5,zk6:11000:/hbase-test", "tab1; tab2:cf1; tab3:cf2,cf3"
+  hbase> add_peer '3', "zk4,zk5,zk6:11000:/hbase-test", "table1; table2:cf1; table3:cf1,cf2"
+  hbase> add_peer '4', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
+  hbase> add_peer '5', CLUSTER_KEY => "server1.cie.com:2181:/hbase",
+    TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
+
+For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two optional arguments
+are DATA and CONFIG which can be specified to set different either the peer_data or configuration
+for the custom replication endpoint. Table column families is optional and can be specified with
+the key TABLE_CFS.
+
+  hbase> add_peer '6', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint'
+  hbase> add_peer '7', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
+    DATA => { "key1" => 1 }
+  hbase> add_peer '8', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
+    CONFIG => { "config1" => "value1", "config2" => "value2" }
+  hbase> add_peer '9', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
+    DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" },
+  hbase> add_peer '10', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
+    TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
+  hbase> add_peer '11', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
+    DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" },
+    TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
+
+Note: Either CLUSTER_KEY or ENDPOINT_CLASSNAME must be specified but not both.
 EOF
       end
 
-      def command(id, cluster_key, peer_tableCFs = nil)
+      def command(id, args = {}, peer_tableCFs = nil)
         format_simple_command do
-          replication_admin.add_peer(id, cluster_key, peer_tableCFs)
+          replication_admin.add_peer(id, args, peer_tableCFs)
         end
       end
     end

http://git-wip-us.apache.org/repos/asf/hbase/blob/31fb0583/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
new file mode 100644
index 0000000..648efa7
--- /dev/null
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -0,0 +1,191 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'shell'
+require 'shell/formatter'
+require 'hbase'
+require 'hbase/hbase'
+require 'hbase/table'
+
+include HBaseConstants
+
+module Hbase
+  class ReplicationAdminTest < Test::Unit::TestCase
+    include TestHelpers
+
+    def setup
+      @test_name = "hbase_shell_tests_table"
+      @peer_id = '1'
+
+      setup_hbase
+      drop_test_table(@test_name)
+      create_test_table(@test_name)
+
+      assert_equal(0, replication_admin.list_peers.length)
+    end
+
+    def teardown
+      assert_equal(0, replication_admin.list_peers.length)
+
+      shutdown
+    end
+
+    define_test "add_peer: should fail when args isn't specified" do
+      assert_raise(ArgumentError) do
+        replication_admin.add_peer(@peer_id, nil)
+      end
+    end
+
+    define_test "add_peer: fail when neither CLUSTER_KEY nor ENDPOINT_CLASSNAME are specified" do
+      assert_raise(ArgumentError) do
+        args = {}
+        replication_admin.add_peer(@peer_id, args)
+      end
+    end
+
+    define_test "add_peer: fail when both CLUSTER_KEY and ENDPOINT_CLASSNAME are specified" do
+      assert_raise(ArgumentError) do
+        args = { CLUSTER_KEY => 'zk1,zk2,zk3:2182:/hbase-prod',
+                 ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint' }
+        replication_admin.add_peer(@peer_id, args)
+      end
+    end
+
+    define_test "add_peer: args must be a string or number" do
+      assert_raise(ArgumentError) do
+        replication_admin.add_peer(@peer_id, 1)
+      end
+      assert_raise(ArgumentError) do
+        replication_admin.add_peer(@peer_id, ['test'])
+      end
+    end
+
+    define_test "add_peer: single zk cluster key" do
+      cluster_key = "server1.cie.com:2181:/hbase"
+
+      replication_admin.add_peer(@peer_id, cluster_key)
+
+      assert_equal(1, replication_admin.list_peers.length)
+      assert(replication_admin.list_peers.key?(@peer_id))
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+
+      # cleanup for future tests
+      replication_admin.remove_peer(@peer_id)
+    end
+
+    define_test "add_peer: multiple zk cluster key" do
+      cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
+
+      replication_admin.add_peer(@peer_id, cluster_key)
+
+      assert_equal(1, replication_admin.list_peers.length)
+      assert(replication_admin.list_peers.key?(@peer_id))
+      assert_equal(replication_admin.list_peers.fetch(@peer_id), cluster_key)
+
+      # cleanup for future tests
+      replication_admin.remove_peer(@peer_id)
+    end
+
+    define_test "add_peer: multiple zk cluster key and table_cfs" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+      table_cfs_str = "table1;table2:cf1;table3:cf2,cf3"
+
+      replication_admin.add_peer(@peer_id, cluster_key, table_cfs_str)
+
+      assert_equal(1, replication_admin.list_peers.length)
+      assert(replication_admin.list_peers.key?(@peer_id))
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+      assert_equal(table_cfs_str, replication_admin.show_peer_tableCFs(@peer_id))
+
+      # cleanup for future tests
+      replication_admin.remove_peer(@peer_id)
+    end
+
+    define_test "add_peer: single zk cluster key - peer config" do
+      cluster_key = "server1.cie.com:2181:/hbase"
+
+      args = { CLUSTER_KEY => cluster_key }
+      replication_admin.add_peer(@peer_id, args)
+
+      assert_equal(1, replication_admin.list_peers.length)
+      assert(replication_admin.list_peers.key?(@peer_id))
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+
+      # cleanup for future tests
+      replication_admin.remove_peer(@peer_id)
+    end
+
+    define_test "add_peer: multiple zk cluster key - peer config" do
+      cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
+
+      args = { CLUSTER_KEY => cluster_key }
+      replication_admin.add_peer(@peer_id, args)
+
+      assert_equal(1, replication_admin.list_peers.length)
+      assert(replication_admin.list_peers.key?(@peer_id))
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+
+      # cleanup for future tests
+      replication_admin.remove_peer(@peer_id)
+    end
+
+    define_test "add_peer: multiple zk cluster key and table_cfs - peer config" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+      table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
+      table_cfs_str = "table1;table2:cf1;table3:cf1,cf2"
+
+      args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs }
+      replication_admin.add_peer(@peer_id, args)
+
+      assert_equal(1, replication_admin.list_peers.length)
+      assert(replication_admin.list_peers.key?(@peer_id))
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+      assert_equal(table_cfs_str, replication_admin.show_peer_tableCFs(@peer_id))
+
+      # cleanup for future tests
+      replication_admin.remove_peer(@peer_id)
+    end
+
+    define_test "add_peer: should fail when args is a hash and peer_tableCFs provided" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+      table_cfs_str = "table1;table2:cf1;table3:cf1,cf2"
+
+      assert_raise(ArgumentError) do
+        args = { CLUSTER_KEY => cluster_key }
+        replication_admin.add_peer(@peer_id, args, table_cfs_str)
+      end
+    end
+
+    # assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279
+    # Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below.
+    # define_test "add_peer: adding a second peer with same id should error" do
+    #   replication_admin.add_peer(@peer_id, '')
+    #   assert_equal(1, replication_admin.list_peers.length)
+    #
+    #   assert_raise(java.lang.IllegalArgumentException) do
+    #     replication_admin.add_peer(@peer_id, '')
+    #   end
+    #
+    #   assert_equal(1, replication_admin.list_peers.length, 1)
+    #
+    #   # cleanup for future tests
+    #   replication_admin.remove_peer(@peer_id)
+    # end
+  end
+end

http://git-wip-us.apache.org/repos/asf/hbase/blob/31fb0583/hbase-shell/src/test/ruby/test_helper.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/test_helper.rb b/hbase-shell/src/test/ruby/test_helper.rb
index 5dfafc5..e2ab921 100644
--- a/hbase-shell/src/test/ruby/test_helper.rb
+++ b/hbase-shell/src/test/ruby/test_helper.rb
@@ -68,6 +68,10 @@ module Hbase
       @shell.hbase_visibility_labels_admin
     end
 
+    def replication_admin
+      @shell.hbase_replication_admin
+    end
+
     def create_test_table(name)
       # Create the table if needed
       unless admin.exists?(name)