You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/04/18 18:33:01 UTC
[4/4] git commit: Allow BRW to authenticate. Patch by Michal
Michalski, reviewed by brandonwilliams for CASSANDRA-4155
Allow BRW to authenticate.
Patch by Michal Michalski, reviewed by brandonwilliams for
CASSANDRA-4155
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6fcd1f47
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6fcd1f47
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6fcd1f47
Branch: refs/heads/cassandra-1.1
Commit: 6fcd1f4704610f5073028faa5ccf027d81b0f470
Parents: ec242b7
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 18 11:25:11 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 18 11:30:07 2012 -0500
----------------------------------------------------------------------
.../apache/cassandra/hadoop/BulkRecordWriter.java | 32 ++++++++++++++-
.../org/apache/cassandra/hadoop/ConfigHelper.java | 10 +++++
2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6fcd1f47/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index 83646fb..cfd5fe4 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -30,6 +30,7 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -134,8 +135,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
if (writer == null)
{
AbstractType<?> subcomparator = null;
+ ExternalClient externalClient = null;
+ String username = ConfigHelper.getOutputKeyspaceUserName(conf);
+ String password = ConfigHelper.getOutputKeyspacePassword(conf);
+
if (cfType == CFType.SUPER)
subcomparator = BytesType.instance;
+
this.writer = new SSTableSimpleUnsortedWriter(
outputdir,
ConfigHelper.getOutputPartitioner(conf),
@@ -145,7 +151,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
subcomparator,
Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")),
ConfigHelper.getOutputCompressionParamaters(conf));
- this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler());
+
+ externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf),
+ ConfigHelper.getOutputRpcPort(conf),
+ username,
+ password);
+
+ this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler());
}
}
@@ -239,12 +251,16 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>();
private String hostlist;
private int rpcPort;
+ private final String username;
+ private final String password;
- public ExternalClient(String hostlist, int port)
+ public ExternalClient(String hostlist, int port, String username, String password)
{
super();
this.hostlist = hostlist;
this.rpcPort = port;
+ this.username = username;
+ this.password = password;
}
public void init(String keyspace)
@@ -269,6 +285,18 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
{
InetAddress host = hostiter.next();
Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
+
+ // log in
+ client.set_keyspace(keyspace);
+ if (username != null)
+ {
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(IAuthenticator.USERNAME_KEY, username);
+ creds.put(IAuthenticator.PASSWORD_KEY, password);
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+ client.login(authRequest);
+ }
+
List<TokenRange> tokenRanges = client.describe_ring(keyspace);
List<KsDef> ksDefs = client.describe_keyspaces();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6fcd1f47/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index b2903a1..87dd5e0 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -302,11 +302,21 @@ public class ConfigHelper
return conf.get(INPUT_KEYSPACE_PASSWD_CONFIG);
}
+ public static void setOutputKeyspaceUserName(Configuration conf, String username)
+ {
+ conf.set(OUTPUT_KEYSPACE_USERNAME_CONFIG, username);
+ }
+
public static String getOutputKeyspaceUserName(Configuration conf)
{
return conf.get(OUTPUT_KEYSPACE_USERNAME_CONFIG);
}
+ public static void setOutputKeyspacePassword(Configuration conf, String password)
+ {
+ conf.set(OUTPUT_KEYSPACE_PASSWD_CONFIG, password);
+ }
+
public static String getOutputKeyspacePassword(Configuration conf)
{
return conf.get(OUTPUT_KEYSPACE_PASSWD_CONFIG);