You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/04/18 18:33:01 UTC

[4/4] git commit: Allow BRW to authenticate. Patch by Michal Michalski, reviewed by brandonwilliams for CASSANDRA-4155

Allow BRW to authenticate.
Patch by Michal Michalski, reviewed by brandonwilliams for
CASSANDRA-4155


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6fcd1f47
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6fcd1f47
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6fcd1f47

Branch: refs/heads/cassandra-1.1
Commit: 6fcd1f4704610f5073028faa5ccf027d81b0f470
Parents: ec242b7
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 18 11:25:11 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 18 11:30:07 2012 -0500

----------------------------------------------------------------------
 .../apache/cassandra/hadoop/BulkRecordWriter.java  |   32 ++++++++++++++-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   10 +++++
 2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6fcd1f47/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index 83646fb..cfd5fe4 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -30,6 +30,7 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -134,8 +135,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
         if (writer == null)
         {
             AbstractType<?> subcomparator = null;
+            ExternalClient externalClient = null;
+            String username = ConfigHelper.getOutputKeyspaceUserName(conf);
+            String password = ConfigHelper.getOutputKeyspacePassword(conf);
+
             if (cfType == CFType.SUPER)
                 subcomparator = BytesType.instance;
+            
             this.writer = new SSTableSimpleUnsortedWriter(
                     outputdir,
                     ConfigHelper.getOutputPartitioner(conf),
@@ -145,7 +151,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                     subcomparator,
                     Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")),
                     ConfigHelper.getOutputCompressionParamaters(conf));
-            this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler());
+
+            externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), 
+                                                ConfigHelper.getOutputRpcPort(conf),
+                                                username,
+                                                password);
+
+            this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler());
         }
     }
 
@@ -239,12 +251,16 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
         private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>();
         private String hostlist;
         private int rpcPort;
+        private final String username;
+        private final String password;
 
-        public ExternalClient(String hostlist, int port)
+        public ExternalClient(String hostlist, int port, String username, String password)
         {
             super();
             this.hostlist = hostlist;
             this.rpcPort = port;
+            this.username = username;
+            this.password = password;
         }
 
         public void init(String keyspace)
@@ -269,6 +285,18 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                 {
                     InetAddress host = hostiter.next();
                     Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
+
+                    // log in
+                    client.set_keyspace(keyspace);
+                    if (username != null)
+                    {
+                        Map<String, String> creds = new HashMap<String, String>();
+                        creds.put(IAuthenticator.USERNAME_KEY, username);
+                        creds.put(IAuthenticator.PASSWORD_KEY, password);
+                        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+                        client.login(authRequest);
+                    }
+
                     List<TokenRange> tokenRanges = client.describe_ring(keyspace);
                     List<KsDef> ksDefs = client.describe_keyspaces();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6fcd1f47/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index b2903a1..87dd5e0 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -302,11 +302,21 @@ public class ConfigHelper
     	return conf.get(INPUT_KEYSPACE_PASSWD_CONFIG);
     }
 
+    public static void setOutputKeyspaceUserName(Configuration conf, String username)
+    {
+        conf.set(OUTPUT_KEYSPACE_USERNAME_CONFIG, username);
+    }
+
     public static String getOutputKeyspaceUserName(Configuration conf)
     {
     	return conf.get(OUTPUT_KEYSPACE_USERNAME_CONFIG);
     }
 
+    public static void setOutputKeyspacePassword(Configuration conf, String password)
+    {
+        conf.set(OUTPUT_KEYSPACE_PASSWD_CONFIG, password);
+    }
+
     public static String getOutputKeyspacePassword(Configuration conf)
     {
     	return conf.get(OUTPUT_KEYSPACE_PASSWD_CONFIG);