You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/05/05 22:57:00 UTC
[3/4] cassandra git commit: Remove Thrift dependencies in bundled
tools
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
index 78080e2..3899f8c 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -22,11 +22,14 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.cassandra.hadoop.AbstractBulkOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
@@ -35,7 +38,7 @@ import org.apache.hadoop.util.Progressable;
* The <code>CqlBulkOutputFormat</code> acts as a Hadoop-specific
* OutputFormat that allows reduce tasks to store keys (and corresponding
* bound variable values) as CQL rows (and respective columns) in a given
- * ColumnFamily.
+ * table.
*
* <p>
* As is the case with the {@link org.apache.cassandra.hadoop.cql3.CqlOutputFormat},
@@ -48,13 +51,14 @@ import org.apache.hadoop.util.Progressable;
* simple.
* </p>
*/
-public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<ByteBuffer>>
+public class CqlBulkOutputFormat extends OutputFormat<Object, List<ByteBuffer>>
+ implements org.apache.hadoop.mapred.OutputFormat<Object, List<ByteBuffer>>
{
- private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema.";
- private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert.";
+ private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.table.schema.";
+ private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.table.insert.";
private static final String DELETE_SOURCE = "cassandra.output.delete.source";
- private static final String COLUMNFAMILY_ALIAS_PREFIX = "cqlbulkoutputformat.columnfamily.alias.";
+ private static final String TABLE_ALIAS_PREFIX = "cqlbulkoutputformat.table.alias.";
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
@@ -75,33 +79,60 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
{
return new CqlBulkRecordWriter(context);
}
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ private void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ {
+ throw new UnsupportedOperationException("you must set the keyspace with setTable()");
+ }
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
- public static void setColumnFamilySchema(Configuration conf, String columnFamily, String schema)
+ public static void setTableSchema(Configuration conf, String columnFamily, String schema)
{
conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema);
}
- public static void setColumnFamilyInsertStatement(Configuration conf, String columnFamily, String insertStatement)
+ public static void setTableInsertStatement(Configuration conf, String columnFamily, String insertStatement)
{
conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
}
- public static String getColumnFamilySchema(Configuration conf, String columnFamily)
+ public static String getTableSchema(Configuration conf, String columnFamily)
{
String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);
if (schema == null)
{
- throw new UnsupportedOperationException("You must set the ColumnFamily schema using setColumnFamilySchema.");
+ throw new UnsupportedOperationException("You must set the Table schema using setTableSchema.");
}
return schema;
}
- public static String getColumnFamilyInsertStatement(Configuration conf, String columnFamily)
+ public static String getTableInsertStatement(Configuration conf, String columnFamily)
{
String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily);
if (insert == null)
{
- throw new UnsupportedOperationException("You must set the ColumnFamily insert statement using setColumnFamilySchema.");
+ throw new UnsupportedOperationException("You must set the Table insert statement using setTableSchema.");
}
return insert;
}
@@ -116,13 +147,31 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
return conf.getBoolean(DELETE_SOURCE, false);
}
- public static void setColumnFamilyAlias(Configuration conf, String alias, String columnFamily)
+ public static void setTableAlias(Configuration conf, String alias, String columnFamily)
{
- conf.set(COLUMNFAMILY_ALIAS_PREFIX + alias, columnFamily);
+ conf.set(TABLE_ALIAS_PREFIX + alias, columnFamily);
}
- public static String getColumnFamilyForAlias(Configuration conf, String alias)
+ public static String getTableForAlias(Configuration conf, String alias)
{
- return conf.get(COLUMNFAMILY_ALIAS_PREFIX + alias);
+ return conf.get(TABLE_ALIAS_PREFIX + alias);
+ }
+
+ public static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 60cd511..e77c4c8 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -17,17 +17,22 @@
*/
package org.apache.cassandra.hadoop.cql3;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
import org.apache.cassandra.hadoop.BulkRecordWriter;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
@@ -35,11 +40,12 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
-
/**
* The <code>CqlBulkRecordWriter</code> maps the output <key, value>
* pairs to a Cassandra column family. In particular, it applies the binded variables
@@ -54,10 +60,26 @@ import org.apache.hadoop.util.Progressable;
*
* @see CqlBulkOutputFormat
*/
-public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<ByteBuffer>>
+public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
+ implements org.apache.hadoop.mapred.RecordWriter<Object, List<ByteBuffer>>
{
+ public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+ public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+ public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+ public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+ private final Logger logger = LoggerFactory.getLogger(CqlBulkRecordWriter.class);
+
+ protected final Configuration conf;
+ protected final int maxFailures;
+ protected final int bufferSize;
+ protected Closeable writer;
+ protected SSTableLoader loader;
+ protected Progressable progress;
+ protected TaskAttemptContext context;
+
private String keyspace;
- private String columnFamily;
+ private String table;
private String schema;
private String insertStatement;
private File outputDir;
@@ -65,19 +87,25 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
{
- super(context);
+ this(HadoopCompat.getConfiguration(context));
+ this.context = context;
setConfigs();
}
CqlBulkRecordWriter(Configuration conf, Progressable progress) throws IOException
{
- super(conf, progress);
+ this(conf);
+ this.progress = progress;
setConfigs();
}
CqlBulkRecordWriter(Configuration conf) throws IOException
{
- super(conf);
+ Config.setOutboundBindAny(true);
+ this.conf = conf;
+ DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+ maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+ bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
setConfigs();
}
@@ -85,54 +113,55 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
{
// if anything is missing, exceptions will be thrown here, instead of on write()
keyspace = ConfigHelper.getOutputKeyspace(conf);
- columnFamily = ConfigHelper.getOutputColumnFamily(conf);
+ table = ConfigHelper.getOutputColumnFamily(conf);
- // check if columnFamily is aliased
- String aliasedCf = CqlBulkOutputFormat.getColumnFamilyForAlias(conf, columnFamily);
+ // check if table is aliased
+ String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table);
if (aliasedCf != null)
- columnFamily = aliasedCf;
+ table = aliasedCf;
- schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily);
- insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
- outputDir = getColumnFamilyDirectory();
+ schema = CqlBulkOutputFormat.getTableSchema(conf, table);
+ insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, table);
+ outputDir = getTableDirectory();
deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf);
}
-
+ protected String getOutputLocation() throws IOException
+ {
+ String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+ if (dir == null)
+ throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+ return dir;
+ }
+
private void prepareWriter() throws IOException
{
- try
+ if (writer == null)
{
- if (writer == null)
- {
- writer = CQLSSTableWriter.builder()
- .forTable(schema)
- .using(insertStatement)
- .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
- .inDirectory(outputDir)
- .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
- .build();
- }
- if (loader == null)
- {
- ExternalClient externalClient = new ExternalClient(conf);
-
- externalClient.addKnownCfs(keyspace, schema);
-
- this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) {
- @Override
- public void onSuccess(StreamState finalState)
- {
- if (deleteSrc)
- FileUtils.deleteRecursive(outputDir);
- }
- };
- }
+ writer = CQLSSTableWriter.builder()
+ .forTable(schema)
+ .using(insertStatement)
+ .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
+ .inDirectory(outputDir)
+ .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
+ .build();
}
- catch (Exception e)
+
+ if (loader == null)
{
- throw new IOException(e);
- }
+ ExternalClient externalClient = new ExternalClient(conf);
+ externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
+
+ loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler())
+ {
+ @Override
+ public void onSuccess(StreamState finalState)
+ {
+ if (deleteSrc)
+ FileUtils.deleteRecursive(outputDir);
+ }
+ };
+ }
}
/**
@@ -168,9 +197,9 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
}
}
- private File getColumnFamilyDirectory() throws IOException
+ private File getTableDirectory() throws IOException
{
- File dir = new File(String.format("%s%s%s%s%s-%s", getOutputLocation(), File.separator, keyspace, File.separator, columnFamily, UUID.randomUUID().toString()));
+ File dir = new File(String.format("%s%s%s%s%s-%s", getOutputLocation(), File.separator, keyspace, File.separator, table, UUID.randomUUID().toString()));
if (!dir.exists() && !dir.mkdirs())
{
@@ -179,41 +208,83 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
return dir;
}
-
- public static class ExternalClient extends AbstractBulkRecordWriter.ExternalClient
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
- private Map<String, Map<String, CFMetaData>> knownCqlCfs = new HashMap<>();
-
- public ExternalClient(Configuration conf)
- {
- super(conf);
- }
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
- public void addKnownCfs(String keyspace, String cql)
+ private void close() throws IOException
+ {
+ if (writer != null)
{
- Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-
- if (cfs == null)
+ writer.close();
+ Future<StreamState> future = loader.stream();
+ while (true)
{
- cfs = new HashMap<>();
- knownCqlCfs.put(keyspace, cfs);
+ try
+ {
+ future.get(1000, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (ExecutionException | TimeoutException te)
+ {
+ if (null != progress)
+ progress.progress();
+ if (null != context)
+ HadoopCompat.progress(context);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ if (loader.getFailedHosts().size() > 0)
+ {
+ if (loader.getFailedHosts().size() > maxFailures)
+ throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+ else
+ logger.warn("Some hosts failed: {}", loader.getFailedHosts());
}
-
- CFMetaData metadata = CFMetaData.compile(cql, keyspace);
- cfs.put(metadata.cfName, metadata);
}
-
- @Override
- public CFMetaData getCFMetaData(String keyspace, String cfName)
+ }
+
+ public static class ExternalClient extends NativeSSTableLoaderClient
+ {
+ public ExternalClient(Configuration conf)
{
- CFMetaData metadata = super.getCFMetaData(keyspace, cfName);
- if (metadata != null)
+ super(resolveHostAddresses(conf),
+ CqlConfigHelper.getOutputNativePort(conf),
+ ConfigHelper.getOutputKeyspaceUserName(conf),
+ ConfigHelper.getOutputKeyspacePassword(conf),
+ CqlConfigHelper.getSSLOptions(conf).orNull());
+ }
+
+ private static Collection<InetAddress> resolveHostAddresses(Configuration conf)
+ {
+ Set<InetAddress> addresses = new HashSet<>();
+
+ for (String host : ConfigHelper.getOutputInitialAddress(conf).split(","))
{
- return metadata;
+ try
+ {
+ addresses.add(InetAddress.getByName(host));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
}
-
- Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
- return cfs != null ? cfs.get(cfName) : null;
+
+ return addresses;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index ac5a7e5..3033fa6 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -34,22 +34,23 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.io.util.FileUtils;
+import com.google.common.base.Optional;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import com.datastax.driver.core.AuthProvider;
-import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.google.common.base.Optional;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
public class CqlConfigHelper
{
@@ -84,6 +85,7 @@ public class CqlConfigHelper
private static final String INPUT_NATIVE_PROTOCOL_VERSION = "cassandra.input.native.protocol.version";
private static final String OUTPUT_CQL = "cassandra.output.cql";
+ private static final String OUTPUT_NATIVE_PORT = "cassandra.output.native.port";
/**
* Set the CQL columns for the input of this job.
@@ -176,6 +178,11 @@ public class CqlConfigHelper
return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042"));
}
+ public static int getOutputNativePort(Configuration conf)
+ {
+ return Integer.parseInt(conf.get(OUTPUT_NATIVE_PORT, "9042"));
+ }
+
public static Optional<Integer> getInputMinSimultReqPerConnections(Configuration conf)
{
return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf);
@@ -294,6 +301,22 @@ public class CqlConfigHelper
public static Cluster getInputCluster(String[] hosts, Configuration conf)
{
int port = getInputNativePort(conf);
+ return getCluster(hosts, conf, port);
+ }
+
+ public static Cluster getOutputCluster(String host, Configuration conf)
+ {
+ return getOutputCluster(new String[]{host}, conf);
+ }
+
+ public static Cluster getOutputCluster(String[] hosts, Configuration conf)
+ {
+ int port = getOutputNativePort(conf);
+ return getCluster(hosts, conf, port);
+ }
+
+ public static Cluster getCluster(String[] hosts, Configuration conf, int port)
+ {
Optional<AuthProvider> authProvider = getAuthProvider(conf);
Optional<SSLOptions> sslOptions = getSSLOptions(conf);
Optional<Integer> protocolVersion = getProtocolVersion(conf);
@@ -301,11 +324,11 @@ public class CqlConfigHelper
SocketOptions socketOptions = getReadSocketOptions(conf);
QueryOptions queryOptions = getReadQueryOptions(conf);
PoolingOptions poolingOptions = getReadPoolingOptions(conf);
-
+
Cluster.Builder builder = Cluster.builder()
- .addContactPoints(hosts)
- .withPort(port)
- .withCompression(ProtocolOptions.Compression.NONE);
+ .addContactPoints(hosts)
+ .withPort(port)
+ .withCompression(ProtocolOptions.Compression.NONE);
if (authProvider.isPresent())
builder.withAuthProvider(authProvider.get());
@@ -316,14 +339,13 @@ public class CqlConfigHelper
builder.withProtocolVersion(protocolVersion.get());
}
builder.withLoadBalancingPolicy(loadBalancingPolicy)
- .withSocketOptions(socketOptions)
- .withQueryOptions(queryOptions)
- .withPoolingOptions(poolingOptions);
+ .withSocketOptions(socketOptions)
+ .withQueryOptions(queryOptions)
+ .withPoolingOptions(poolingOptions);
return builder.build();
}
-
public static void setInputCoreConnections(Configuration conf, String connections)
{
conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);
@@ -502,7 +524,7 @@ public class CqlConfigHelper
return Optional.of(getClientAuthProvider(authProvider.get(), conf));
}
- private static Optional<SSLOptions> getSSLOptions(Configuration conf)
+ public static Optional<SSLOptions> getSSLOptions(Configuration conf)
{
Optional<String> truststorePath = getInputNativeSSLTruststorePath(conf);
Optional<String> keystorePath = getInputNativeSSLKeystorePath(conf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index 0d09ca2..9a1cda6 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -23,15 +23,15 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.*;
+import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
/**
- * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
+ * The <code>CqlOutputFormat</code> acts as a Hadoop-specific
* OutputFormat that allows reduce tasks to store keys (and corresponding
* bound variable values) as CQL rows (and respective columns) in a given
- * ColumnFamily.
+ * table.
*
* <p>
* As is the case with the {@link org.apache.cassandra.hadoop.ColumnFamilyInputFormat},
@@ -52,8 +52,51 @@ import org.apache.hadoop.mapreduce.*;
* to Cassandra.
* </p>
*/
-public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
-{
+public class CqlOutputFormat extends OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
+ implements org.apache.hadoop.mapred.OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
+{
+ /**
+ * Check for validity of the output-specification for the job.
+ *
+ * @param context
+ * information about the job
+ */
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ protected void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
+ if (ConfigHelper.getOutputPartitioner(conf) == null)
+ throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
+ if (ConfigHelper.getOutputInitialAddress(conf) == null)
+ throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ /**
+ * The OutputCommitter for this format does not write any data to the DFS.
+ *
+ * @param context
+ * the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
public CqlRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
@@ -73,4 +116,25 @@ public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String
{
return new CqlRecordWriter(context);
}
+
+ /**
+ * An {@link OutputCommitter} that does nothing.
+ */
+ private static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 308bdf8..4a7bd59 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -37,13 +37,15 @@ import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.Token;
import com.datastax.driver.core.TupleValue;
import com.datastax.driver.core.UDTValue;
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.db.SystemKeyspace;
+import com.google.common.reflect.TypeToken;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.hadoop.ColumnFamilySplit;
@@ -493,36 +495,72 @@ public class CqlRecordReader extends RecordReader<Long, Row>
}
@Override
+ public <T> List<T> getList(int i, TypeToken<T> typeToken)
+ {
+ return row.getList(i, typeToken);
+ }
+
+ @Override
public <T> List<T> getList(String name, Class<T> elementsClass)
{
return row.getList(name, elementsClass);
}
@Override
+ public <T> List<T> getList(String s, TypeToken<T> typeToken)
+ {
+ return row.getList(s, typeToken);
+ }
+
+ @Override
public <T> Set<T> getSet(int i, Class<T> elementsClass)
{
return row.getSet(i, elementsClass);
}
@Override
+ public <T> Set<T> getSet(int i, TypeToken<T> typeToken)
+ {
+ return row.getSet(i, typeToken);
+ }
+
+ @Override
public <T> Set<T> getSet(String name, Class<T> elementsClass)
{
return row.getSet(name, elementsClass);
}
@Override
+ public <T> Set<T> getSet(String s, TypeToken<T> typeToken)
+ {
+ return row.getSet(s, typeToken);
+ }
+
+ @Override
public <K, V> Map<K, V> getMap(int i, Class<K> keysClass, Class<V> valuesClass)
{
return row.getMap(i, keysClass, valuesClass);
}
@Override
+ public <K, V> Map<K, V> getMap(int i, TypeToken<K> typeToken, TypeToken<V> typeToken1)
+ {
+ return row.getMap(i, typeToken, typeToken1);
+ }
+
+ @Override
public <K, V> Map<K, V> getMap(String name, Class<K> keysClass, Class<V> valuesClass)
{
return row.getMap(name, keysClass, valuesClass);
}
@Override
+ public <K, V> Map<K, V> getMap(String s, TypeToken<K> typeToken, TypeToken<V> typeToken1)
+ {
+ return row.getMap(s, typeToken, typeToken1);
+ }
+
+ @Override
public UDTValue getUDTValue(int i)
{
return row.getUDTValue(i);
@@ -545,6 +583,24 @@ public class CqlRecordReader extends RecordReader<Long, Row>
{
return row.getTupleValue(name);
}
+
+ @Override
+ public Token getToken(int i)
+ {
+ return row.getToken(i);
+ }
+
+ @Override
+ public Token getToken(String name)
+ {
+ return row.getToken(name);
+ }
+
+ @Override
+ public Token getPartitionKeyToken()
+ {
+ return row.getPartitionKeyToken();
+ }
}
/**
@@ -604,36 +660,21 @@ public class CqlRecordReader extends RecordReader<Long, Row>
private void fetchKeys()
{
- String query = String.format("SELECT column_name, component_index, type " +
- "FROM %s.%s " +
- "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
- SystemKeyspace.NAME,
- LegacySchemaTables.COLUMNS,
- keyspace,
- cfName);
-
// get CF meta data
- List<Row> rows = session.execute(query).all();
- if (rows.isEmpty())
+ TableMetadata tableMetadata = session.getCluster()
+ .getMetadata()
+ .getKeyspace(Metadata.quote(keyspace))
+ .getTable(Metadata.quote(cfName));
+ if (tableMetadata == null)
{
throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
}
- int numberOfPartitionKeys = 0;
- for (Row row : rows)
- if (row.getString(2).equals("partition_key"))
- numberOfPartitionKeys++;
- String[] partitionKeyArray = new String[numberOfPartitionKeys];
- for (Row row : rows)
+ //Here we assume that tableMetadata.getPartitionKey() always
+ //returns the list of columns in order of component_index
+ for (ColumnMetadata partitionKey : tableMetadata.getPartitionKey())
{
- String type = row.getString(2);
- String column = row.getString(0);
- if (type.equals("partition_key"))
- {
- int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
- partitionKeyArray[componentIndex] = column;
- }
+ partitionKeys.add(partitionKey.getName());
}
- partitionKeys.addAll(Arrays.asList(partitionKeyArray));
}
private String quote(String identifier)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index dbbeb47..1d8436b 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -21,37 +21,39 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.db.SystemKeyspace;
+
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.TokenRange;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.utils.*;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.Progressable;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
/**
- * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
- * pairs to a Cassandra column family. In particular, it applies the binded variables
+ * The <code>CqlRecordWriter</code> maps the output <key, value>
+ * pairs to a Cassandra table. In particular, it applies the binded variables
* in the value to the prepared statement, which it associates with the key, and in
* turn the responsible endpoint.
*
@@ -63,21 +65,38 @@ import org.apache.thrift.transport.TTransport;
*
* @see CqlOutputFormat
*/
-class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> implements AutoCloseable
+class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> implements
+ org.apache.hadoop.mapred.RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>>, AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(CqlRecordWriter.class);
+ // The configuration this writer is associated with.
+ protected final Configuration conf;
+ // The number of mutations to buffer per endpoint
+ protected final int queueSize;
+
+ protected final long batchThreshold;
+
+ protected Progressable progressable;
+ protected TaskAttemptContext context;
+
+ // The ring cache that describes the token ranges each node in the ring is
+ // responsible for. This is what allows us to group the mutations by
+ // the endpoints they should be targeted at. The targeted endpoint
+ // essentially
+ // acts as the primary replica for the rows being affected by the mutations.
+ private final NativeRingCache ringCache;
+
// handles for clients for each range running in the threadpool
protected final Map<InetAddress, RangeClient> clients;
// host to prepared statement id mappings
- protected final ConcurrentHashMap<Cassandra.Client, Integer> preparedStatements = new ConcurrentHashMap<Cassandra.Client, Integer>();
+ protected final ConcurrentHashMap<Session, PreparedStatement> preparedStatements = new ConcurrentHashMap<Session, PreparedStatement>();
protected final String cql;
- protected AbstractType<?> keyValidator;
- protected String [] partitionKeyColumns;
- protected List<String> clusterColumns;
+ protected List<ColumnMetadata> partitionKeyColumns;
+ protected List<ColumnMetadata> clusterColumns;
/**
* Upon construction, obtain the map that this writer will use to collect
@@ -100,28 +119,28 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
CqlRecordWriter(Configuration conf)
{
- super(conf);
+ this.conf = conf;
+ this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+ batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
this.clients = new HashMap<>();
try
{
- Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
+ String keyspace = ConfigHelper.getOutputKeyspace(conf);
+ Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace);
+ ringCache = new NativeRingCache(conf);
if (client != null)
{
- client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
- String user = ConfigHelper.getOutputKeyspaceUserName(conf);
- String password = ConfigHelper.getOutputKeyspacePassword(conf);
- if ((user != null) && (password != null))
- AbstractColumnFamilyOutputFormat.login(user, password, client);
- retrievePartitionKeyValidator(client);
+ TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
+ clusterColumns = tableMetadata.getClusteringColumns();
+ partitionKeyColumns = tableMetadata.getPartitionKey();
+
String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
if (cqlQuery.toLowerCase().startsWith("insert"))
throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
cql = appendKeyWhereClauses(cqlQuery);
- TTransport transport = client.getOutputProtocol().getTransport();
- if (transport.isOpen())
- transport.close();
+ client.close();
}
else
{
@@ -133,7 +152,26 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
throw new RuntimeException(e);
}
}
-
+
+ /**
+ * Close this <code>RecordWriter</code> to future operations, but not before
+ * flushing out the batched mutations.
+ *
+ * @param context the context of the task
+ * @throws IOException
+ */
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
+
@Override
public void close() throws IOException
{
@@ -157,7 +195,7 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
/**
* If the key is to be associated with a valid value, a mutation is created
- * for it with the given column family and columns. In the event the value
+ * for it with the given table and columns. In the event the value
* in the column is missing (i.e., null), then it is marked for
* {@link Deletion}. Similarly, if the entire value for a key is missing
* (i.e., null), then the entire key is marked for {@link Deletion}.
@@ -172,25 +210,25 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
@Override
public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
{
- Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns));
+ TokenRange range = ringCache.getRange(getPartitionKey(keyColumns));
// get the client for the given range, or create a new one
- final InetAddress address = ringCache.getEndpoint(range).get(0);
+ final InetAddress address = ringCache.getEndpoints(range).get(0);
RangeClient client = clients.get(address);
if (client == null)
{
// haven't seen keys for this range: create new client
- client = new RangeClient(ringCache.getEndpoint(range));
+ client = new RangeClient(ringCache.getEndpoints(range));
client.start();
clients.put(address, client);
}
// add primary key columns to the bind variables
List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values);
- for (String column : partitionKeyColumns)
- allValues.add(keyColumns.get(column));
- for (String column : clusterColumns)
- allValues.add(keyColumns.get(column));
+ for (ColumnMetadata column : partitionKeyColumns)
+ allValues.add(keyColumns.get(column.getName()));
+ for (ColumnMetadata column : clusterColumns)
+ allValues.add(keyColumns.get(column.getName()));
client.put(allValues);
@@ -204,16 +242,50 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
* A client that runs in a threadpool and connects to the list of endpoints for a particular
* range. Bound variables for keys in that range are sent to this client via a queue.
*/
- public class RangeClient extends AbstractRangeClient<List<ByteBuffer>>
+ public class RangeClient extends Thread
{
+ // The list of endpoints for this range
+ protected final List<InetAddress> endpoints;
+ protected Session client;
+ // A bounded queue of incoming mutations for this range
+ protected final BlockingQueue<List<ByteBuffer>> queue = new ArrayBlockingQueue<List<ByteBuffer>>(queueSize);
+
+ protected volatile boolean run = true;
+ // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+ // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+ // when the client is closed.
+ protected volatile IOException lastException;
+
/**
* Constructs an {@link RangeClient} for the given endpoints.
* @param endpoints the possible endpoints to execute the mutations on
*/
public RangeClient(List<InetAddress> endpoints)
{
- super(endpoints);
- }
+ super("client-" + endpoints);
+ this.endpoints = endpoints;
+ }
+
+ /**
+ * enqueues the given value to Cassandra
+ */
+ public void put(List<ByteBuffer> value) throws IOException
+ {
+ while (true)
+ {
+ if (lastException != null)
+ throw lastException;
+ try
+ {
+ if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ }
/**
* Loops collecting cql binded variable values from the queue and sending to Cassandra
@@ -234,156 +306,138 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
continue;
}
- Iterator<InetAddress> iter = endpoints.iterator();
+ ListIterator<InetAddress> iter = endpoints.listIterator();
while (true)
{
// send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
+
+ // attempt to connect to a different endpoint
try
{
- int i = 0;
- int itemId = preparedStatement(client);
- while (bindVariables != null)
- {
- client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
- i++;
-
- if (i >= batchThreshold)
- break;
-
- bindVariables = queue.poll();
- }
-
- break;
+ InetAddress address = iter.next();
+ String host = address.getHostName();
+ client = CqlConfigHelper.getOutputCluster(host, conf).connect();
}
catch (Exception e)
{
+ //If connection died due to Interrupt, just try connecting to the endpoint again.
+ if (Thread.interrupted()) {
+ lastException = new IOException(e);
+ iter.previous();
+ }
closeInternal();
- if (!iter.hasNext())
+
+ // Most exceptions mean something unexpected went wrong to that endpoint, so
+ // we should try again to another. Other exceptions (auth or invalid request) are fatal.
+ if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
{
lastException = new IOException(e);
break outer;
}
}
- // attempt to connect to a different endpoint
try
{
- InetAddress address = iter.next();
- String host = address.getHostName();
- int port = ConfigHelper.getOutputRpcPort(conf);
- client = CqlOutputFormat.createAuthenticatedClient(host, port, conf);
+ int i = 0;
+ PreparedStatement statement = preparedStatement(client);
+ while (bindVariables != null)
+ {
+ BoundStatement boundStatement = new BoundStatement(statement);
+ for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+ {
+ boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+ }
+ client.execute(boundStatement);
+ i++;
+
+ if (i >= batchThreshold)
+ break;
+ bindVariables = queue.poll();
+ }
+ break;
}
catch (Exception e)
{
closeInternal();
- // TException means something unexpected went wrong to that endpoint, so
- // we should try again to another. Other exceptions (auth or invalid request) are fatal.
- if ((!(e instanceof TException)) || !iter.hasNext())
+ if (!iter.hasNext())
{
lastException = new IOException(e);
break outer;
}
}
+
}
}
-
// close all our connections once we are done.
closeInternal();
}
/** get prepared statement id from cache, otherwise prepare it from Cassandra server*/
- private int preparedStatement(Cassandra.Client client)
+ private PreparedStatement preparedStatement(Session client)
{
- Integer itemId = preparedStatements.get(client);
- if (itemId == null)
+ PreparedStatement statement = preparedStatements.get(client);
+ if (statement == null)
{
- CqlPreparedResult result;
+ PreparedStatement result;
try
{
- result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE);
+ result = client.prepare(cql);
}
- catch (TException e)
+ catch (NoHostAvailableException e)
{
throw new RuntimeException("failed to prepare cql query " + cql, e);
}
- Integer previousId = preparedStatements.putIfAbsent(client, Integer.valueOf(result.itemId));
- itemId = previousId == null ? result.itemId : previousId;
+ PreparedStatement previousId = preparedStatements.putIfAbsent(client, result);
+ statement = previousId == null ? result : previousId;
}
- return itemId;
+ return statement;
}
- }
- private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
- {
- ByteBuffer partitionKey;
- if (keyValidator instanceof CompositeType)
+ public void close() throws IOException
{
- ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length];
- for (int i = 0; i< keys.length; i++)
- keys[i] = keyColumns.get(partitionKeyColumns[i]);
+ // stop the run loop. this will result in closeInternal being called by the time join() finishes.
+ run = false;
+ interrupt();
+ try
+ {
+ this.join();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
- partitionKey = CompositeType.build(keys);
- }
- else
- {
- partitionKey = keyColumns.get(partitionKeyColumns[0]);
+ if (lastException != null)
+ throw lastException;
}
- return partitionKey;
- }
- // FIXME
- /** retrieve the key validator from system.schema_columnfamilies table */
- private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception
- {
- String keyspace = ConfigHelper.getOutputKeyspace(conf);
- String cfName = ConfigHelper.getOutputColumnFamily(conf);
- String query = String.format("SELECT key_validator, key_aliases, column_aliases " +
- "FROM %s.%s " +
- "WHERE keyspace_name = '%s' and columnfamily_name = '%s'",
- SystemKeyspace.NAME,
- LegacySchemaTables.COLUMNFAMILIES,
- keyspace,
- cfName);
- CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
- Column rawKeyValidator = result.rows.get(0).columns.get(0);
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
- keyValidator = parseType(validator);
-
- Column rawPartitionKeys = result.rows.get(0).columns.get(1);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue()));
- logger.debug("partition keys: {}", keyString);
-
- List<String> keys = FBUtilities.fromJsonList(keyString);
- partitionKeyColumns = new String[keys.size()];
- int i = 0;
- for (String key : keys)
+
+ protected void closeInternal()
{
- partitionKeyColumns[i] = key;
- i++;
+ if (client != null)
+ {
+ client.close();;
+ }
}
-
- Column rawClusterColumns = result.rows.get(0).columns.get(2);
- String clusterColumnString = ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue()));
-
- logger.debug("cluster columns: {}", clusterColumnString);
- clusterColumns = FBUtilities.fromJsonList(clusterColumnString);
}
- private AbstractType<?> parseType(String type) throws ConfigurationException
+ private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
{
- try
+ ByteBuffer partitionKey;
+ if (partitionKeyColumns.size() > 1)
{
- // always treat counters like longs, specifically CCT.serialize is not what we need
- if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
- return LongType.instance;
- return TypeParser.parse(type);
+ ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.size()];
+ for (int i = 0; i< keys.length; i++)
+ keys[i] = keyColumns.get(partitionKeyColumns.get(i).getName());
+
+ partitionKey = CompositeType.build(keys);
}
- catch (SyntaxException e)
+ else
{
- throw new ConfigurationException(e.getMessage(), e);
+ partitionKey = keyColumns.get(partitionKeyColumns.get(0).getName());
}
+ return partitionKey;
}
/**
@@ -393,10 +447,10 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
{
String keyWhereClause = "";
- for (String partitionKey : partitionKeyColumns)
- keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? quote(partitionKey) : (" AND " + quote(partitionKey)));
- for (String clusterColumn : clusterColumns)
- keyWhereClause += " AND " + quote(clusterColumn) + " = ?";
+ for (ColumnMetadata partitionKey : partitionKeyColumns)
+ keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? quote(partitionKey.getName()) : (" AND " + quote(partitionKey.getName())));
+ for (ColumnMetadata clusterColumn : clusterColumns)
+ keyWhereClause += " AND " + quote(clusterColumn.getName()) + " = ?";
return cqlQuery + " WHERE " + keyWhereClause;
}
@@ -406,4 +460,60 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
{
return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
}
+
+ class NativeRingCache
+ {
+ private Map<TokenRange, Set<Host>> rangeMap;
+ private Metadata metadata;
+ private final IPartitioner partitioner;
+ private final Configuration conf;
+
+ public NativeRingCache(Configuration conf)
+ {
+ this.conf = conf;
+ this.partitioner = ConfigHelper.getOutputPartitioner(conf);
+ refreshEndpointMap();
+ }
+
+
+ private void refreshEndpointMap()
+ {
+ String keyspace = ConfigHelper.getOutputKeyspace(conf);
+ Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace);
+ rangeMap = new HashMap<>();
+ metadata = session.getCluster().getMetadata();
+ Set<TokenRange> ranges = metadata.getTokenRanges();
+ for (TokenRange range : ranges)
+ {
+ rangeMap.put(range, metadata.getReplicas(keyspace, range));
+ }
+ }
+
+ public TokenRange getRange(ByteBuffer key)
+ {
+ Token t = partitioner.getToken(key);
+ com.datastax.driver.core.Token driverToken = metadata.newToken(partitioner.getTokenFactory().toString(t));
+ for (TokenRange range : rangeMap.keySet())
+ {
+ if (range.contains(driverToken))
+ {
+ return range;
+ }
+ }
+
+ throw new RuntimeException("Invalid token information returned by describe_ring: " + rangeMap);
+ }
+
+ public List<InetAddress> getEndpoints(TokenRange range)
+ {
+ Set<Host> hostSet = rangeMap.get(range);
+ List<Host> hosts = Arrays.asList(rangeMap.get(range).toArray(new Host[rangeMap.get(range).size()]));
+ List<InetAddress> addresses = new ArrayList<>(hosts.size());
+ for (Host host: hosts)
+ {
+ addresses.add(host.getAddress());
+ }
+ return addresses;
+ }
+ }
}