You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/01/09 17:33:07 UTC
git commit: Separate input and output connection details in
ConfigHelper. Patch by Mck SembWever,
reviewed by brandonwilliams for CASSANDRA-3197
Updated Branches:
refs/heads/trunk 710609baf -> b90462aef
Separate input and output connection details in ConfigHelper.
Patch by Mck SembWever, reviewed by brandonwilliams for CASSANDRA-3197
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b90462ae
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b90462ae
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b90462ae
Branch: refs/heads/trunk
Commit: b90462aefbab4e443a4a4d83da7a30cd4516697f
Parents: 710609b
Author: Brandon Williams <br...@apache.org>
Authored: Mon Jan 9 10:23:53 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Jan 9 10:23:53 2012 -0600
----------------------------------------------------------------------
contrib/pig/README.txt | 10 ++
.../cassandra/hadoop/pig/CassandraStorage.java | 67 +++++++++---
.../org/apache/cassandra/client/RingCache.java | 6 +-
.../cassandra/hadoop/ColumnFamilyInputFormat.java | 6 +-
.../cassandra/hadoop/ColumnFamilyRecordReader.java | 2 +-
.../cassandra/hadoop/ColumnFamilyRecordWriter.java | 2 +-
.../org/apache/cassandra/hadoop/ConfigHelper.java | 86 +++++++++++---
.../org/apache/cassandra/client/TestRingCache.java | 6 +-
8 files changed, 139 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/contrib/pig/README.txt
----------------------------------------------------------------------
diff --git a/contrib/pig/README.txt b/contrib/pig/README.txt
index 93eceb2..604030e 100644
--- a/contrib/pig/README.txt
+++ b/contrib/pig/README.txt
@@ -27,6 +27,16 @@ export PIG_INITIAL_ADDRESS=localhost
export PIG_RPC_PORT=9160
export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner
+These properties can be overridden with the following if you use different clusters
+for input and output:
+* PIG_INPUT_INITIAL_ADDRESS : initial address to connect to for reading
+* PIG_INPUT_RPC_PORT : the port thrift is listening on for reading
+* PIG_INPUT_PARTITIONER : cluster partitioner for reading
+* PIG_OUTPUT_INITIAL_ADDRESS : initial address to connect to for writing
+* PIG_OUTPUT_RPC_PORT : the port thrift is listening on for writing
+* PIG_OUTPUT_PARTITIONER : cluster partitioner for writing
+
+
Then you can build and run it like this:
contrib/pig$ ant
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 2b41abf..9a84646 100644
--- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.thrift.Mutation;
@@ -49,15 +48,10 @@ import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
/**
* A LoadStoreFunc for retrieving data from and storing data to Cassandra
@@ -68,6 +62,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
// system environment variables that can be set to configure connection info:
// alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
+ public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
+ public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
+ public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
+ public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
+ public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
+ public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
@@ -288,17 +288,36 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private void setConnectionInformation() throws IOException
{
if (System.getenv(PIG_RPC_PORT) != null)
- ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
- else if (ConfigHelper.getRpcPort(conf) == 0)
- throw new IOException("PIG_RPC_PORT environment variable not set");
+ {
+ ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ }
+
+ if (System.getenv(PIG_INPUT_RPC_PORT) != null)
+ ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
+ if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
+ ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
+
if (System.getenv(PIG_INITIAL_ADDRESS) != null)
- ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
- else if (ConfigHelper.getInitialAddress(conf) == null)
- throw new IOException("PIG_INITIAL_ADDRESS environment variable not set");
+ {
+ ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ }
+ if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
+ ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
+ if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
+ ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
+
if (System.getenv(PIG_PARTITIONER) != null)
- ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
- else if (ConfigHelper.getPartitioner(conf) == null)
- throw new IOException("PIG_PARTITIONER environment variable not set");
+ {
+ ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ }
+ if(System.getenv(PIG_INPUT_PARTITIONER) != null)
+ ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
+ if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
+ ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
+
}
@Override
@@ -314,6 +333,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
}
ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
+
+ if (ConfigHelper.getInputRpcPort(conf) == 0)
+ throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+ if (ConfigHelper.getInputInitialAddress(conf) == null)
+ throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+ if (ConfigHelper.getInputPartitioner(conf) == null)
+ throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+
initSchema(loadSignature);
}
@@ -448,6 +475,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
setLocationFromUri(location);
ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
+
+ if (ConfigHelper.getOutputRpcPort(conf) == 0)
+ throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+ if (ConfigHelper.getOutputInitialAddress(conf) == null)
+ throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+ if (ConfigHelper.getOutputPartitioner(conf) == null)
+ throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+
initSchema(storeSignature);
}
@@ -565,7 +600,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
Cassandra.Client client = null;
try
{
- client = ConfigHelper.getClientFromAddressList(conf);
+ client = ConfigHelper.getClientFromInputAddressList(conf);
CfDef cfDef = null;
client.set_keyspace(keyspace);
KsDef ksDef = client.describe_keyspace(keyspace);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/src/java/org/apache/cassandra/client/RingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/client/RingCache.java b/src/java/org/apache/cassandra/client/RingCache.java
index 8d6648c..9b28a83 100644
--- a/src/java/org/apache/cassandra/client/RingCache.java
+++ b/src/java/org/apache/cassandra/client/RingCache.java
@@ -55,7 +55,7 @@ public class RingCache
public RingCache(Configuration conf) throws IOException
{
this.conf = conf;
- this.partitioner = ConfigHelper.getPartitioner(conf);
+ this.partitioner = ConfigHelper.getOutputPartitioner(conf);
refreshEndpointMap();
}
@@ -63,7 +63,7 @@ public class RingCache
{
try {
- Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf);
+ Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
rangeMap = ArrayListMultimap.create();
@@ -96,7 +96,7 @@ public class RingCache
}
catch (TException e)
{
- logger_.debug("Error contacting seed list" + ConfigHelper.getInitialAddress(conf) + " " + e.getMessage());
+ logger_.debug("Error contacting seed list" + ConfigHelper.getOutputInitialAddress(conf) + " " + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index a23e999..c13e881 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -126,7 +126,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
Range<Token> jobRange = null;
if (jobKeyRange != null)
{
- partitioner = ConfigHelper.getPartitioner(context.getConfiguration());
+ partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner";
assert jobKeyRange.start_key == null : "only start_token supported";
assert jobKeyRange.end_key == null : "only end_token supported";
@@ -239,7 +239,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
try
{
- Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getRpcPort(conf), true);
+ Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getInputRpcPort(conf), true);
client.set_keyspace(keyspace);
return client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
}
@@ -262,7 +262,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
private List<TokenRange> getRangeMap(Configuration conf) throws IOException
{
- Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf);
+ Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
List<TokenRange> map;
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index b84eb85..e3d1bb0 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -149,7 +149,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
// create connection using thrift
String location = getLocation();
- socket = new TSocket(location, ConfigHelper.getRpcPort(conf));
+ socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf));
TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
client = new Cassandra.Client(binaryProtocol);
socket.open();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 328a0f7..5dc7655 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -314,7 +314,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
try
{
InetAddress address = iter.next();
- thriftSocket = new TSocket(address.getHostName(), ConfigHelper.getRpcPort(conf));
+ thriftSocket = new TSocket(address.getHostName(), ConfigHelper.getOutputRpcPort(conf));
thriftClient = ColumnFamilyOutputFormat.createAuthenticatedClient(thriftSocket, conf);
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index d8a1ab9..47407c0 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -45,7 +45,8 @@ import org.slf4j.LoggerFactory;
public class ConfigHelper
{
- private static final String PARTITIONER_CONFIG = "cassandra.partitioner.class";
+ private static final String INPUT_PARTITIONER_CONFIG = "cassandra.input.partitioner.class";
+ private static final String OUTPUT_PARTITIONER_CONFIG = "cassandra.output.partitioner.class";
private static final String INPUT_KEYSPACE_CONFIG = "cassandra.input.keyspace";
private static final String OUTPUT_KEYSPACE_CONFIG = "cassandra.output.keyspace";
private static final String INPUT_KEYSPACE_USERNAME_CONFIG = "cassandra.input.keyspace.username";
@@ -56,13 +57,14 @@ public class ConfigHelper
private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily";
private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate";
private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange";
- private static final String OUTPUT_PREDICATE_CONFIG = "cassandra.output.predicate";
private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size";
private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
- private static final String THRIFT_PORT = "cassandra.thrift.port";
- private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
+ private static final String INPUT_THRIFT_PORT = "cassandra.input.thrift.port";
+ private static final String OUTPUT_THRIFT_PORT = "cassandra.output.thrift.port";
+ private static final String INPUT_INITIAL_THRIFT_ADDRESS = "cassandra.input.thrift.address";
+ private static final String OUTPUT_INITIAL_THRIFT_ADDRESS = "cassandra.output.thrift.address";
private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
@@ -309,36 +311,36 @@ public class ConfigHelper
return conf.get(WRITE_CONSISTENCY_LEVEL, "ONE");
}
- public static int getRpcPort(Configuration conf)
+ public static int getInputRpcPort(Configuration conf)
{
- return Integer.parseInt(conf.get(THRIFT_PORT));
+ return Integer.parseInt(conf.get(INPUT_THRIFT_PORT));
}
- public static void setRpcPort(Configuration conf, String port)
+ public static void setInputRpcPort(Configuration conf, String port)
{
- conf.set(THRIFT_PORT, port);
+ conf.set(INPUT_THRIFT_PORT, port);
}
- public static String getInitialAddress(Configuration conf)
+ public static String getInputInitialAddress(Configuration conf)
{
- return conf.get(INITIAL_THRIFT_ADDRESS);
+ return conf.get(INPUT_INITIAL_THRIFT_ADDRESS);
}
- public static void setInitialAddress(Configuration conf, String address)
+ public static void setInputInitialAddress(Configuration conf, String address)
{
- conf.set(INITIAL_THRIFT_ADDRESS, address);
+ conf.set(INPUT_INITIAL_THRIFT_ADDRESS, address);
}
- public static void setPartitioner(Configuration conf, String classname)
+ public static void setInputPartitioner(Configuration conf, String classname)
{
- conf.set(PARTITIONER_CONFIG, classname);
+ conf.set(INPUT_PARTITIONER_CONFIG, classname);
}
- public static IPartitioner getPartitioner(Configuration conf)
+ public static IPartitioner getInputPartitioner(Configuration conf)
{
try
{
- return FBUtilities.newPartitioner(conf.get(PARTITIONER_CONFIG));
+ return FBUtilities.newPartitioner(conf.get(INPUT_PARTITIONER_CONFIG));
}
catch (ConfigurationException e)
{
@@ -346,17 +348,63 @@ public class ConfigHelper
}
}
+ public static int getOutputRpcPort(Configuration conf)
+ {
+ return Integer.parseInt(conf.get(OUTPUT_THRIFT_PORT));
+ }
+
+ public static void setOutputRpcPort(Configuration conf, String port)
+ {
+ conf.set(OUTPUT_THRIFT_PORT, port);
+ }
+
+ public static String getOutputInitialAddress(Configuration conf)
+ {
+ return conf.get(OUTPUT_INITIAL_THRIFT_ADDRESS);
+ }
+
+ public static void setOutputInitialAddress(Configuration conf, String address)
+ {
+ conf.set(OUTPUT_INITIAL_THRIFT_ADDRESS, address);
+ }
+
+ public static void setOutputPartitioner(Configuration conf, String classname)
+ {
+ conf.set(OUTPUT_PARTITIONER_CONFIG, classname);
+ }
+
+ public static IPartitioner getOutputPartitioner(Configuration conf)
+ {
+ try
+ {
+ return FBUtilities.newPartitioner(conf.get(OUTPUT_PARTITIONER_CONFIG));
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ public static Cassandra.Client getClientFromInputAddressList(Configuration conf) throws IOException
+ {
+ return getClientFromAddressList(conf, ConfigHelper.getInputInitialAddress(conf).split(","), ConfigHelper.getInputRpcPort(conf));
+ }
+
+ public static Cassandra.Client getClientFromOutputAddressList(Configuration conf) throws IOException
+ {
+ return getClientFromAddressList(conf, ConfigHelper.getOutputInitialAddress(conf).split(","), ConfigHelper.getOutputRpcPort(conf));
+ }
- public static Cassandra.Client getClientFromAddressList(Configuration conf) throws IOException
+ private static Cassandra.Client getClientFromAddressList(Configuration conf, String[] addresses, int port) throws IOException
{
- String[] addresses = ConfigHelper.getInitialAddress(conf).split(",");
Cassandra.Client client = null;
List<IOException> exceptions = new ArrayList<IOException>();
for (String address : addresses)
{
try
{
- client = createConnection(address, ConfigHelper.getRpcPort(conf), true);
+ client = createConnection(address, port, true);
break;
}
catch (IOException ioe)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b90462ae/test/unit/org/apache/cassandra/client/TestRingCache.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/client/TestRingCache.java b/test/unit/org/apache/cassandra/client/TestRingCache.java
index 58c4d26..4fae42b 100644
--- a/test/unit/org/apache/cassandra/client/TestRingCache.java
+++ b/test/unit/org/apache/cassandra/client/TestRingCache.java
@@ -63,9 +63,9 @@ public class TestRingCache
thriftClient = cassandraClient;
String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress();
conf = new Configuration();
- ConfigHelper.setPartitioner(conf, DatabaseDescriptor.getPartitioner().getClass().getName());
- ConfigHelper.setInitialAddress(conf, seed);
- ConfigHelper.setRpcPort(conf, Integer.toString(DatabaseDescriptor.getRpcPort()));
+ ConfigHelper.setOutputPartitioner(conf, DatabaseDescriptor.getPartitioner().getClass().getName());
+ ConfigHelper.setOutputInitialAddress(conf, seed);
+ ConfigHelper.setOutputRpcPort(conf, Integer.toString(DatabaseDescriptor.getRpcPort()));
}