You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/08/14 22:51:18 UTC
[2/5] git commit: backport CASSANDRA-6927 to 2.0
backport CASSANDRA-6927 to 2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44764c03
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44764c03
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44764c03
Branch: refs/heads/trunk
Commit: 44764c03e892bfdb7294ad32d9ff703909186917
Parents: efde6ae
Author: Brandon Williams <br...@apache.org>
Authored: Wed Aug 13 14:14:38 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Aug 13 14:14:38 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/AbstractBulkOutputFormat.java | 73 ++++++
.../hadoop/AbstractBulkRecordWriter.java | 251 ++++++++++++++++++
.../cassandra/hadoop/BulkOutputFormat.java | 49 +---
.../cassandra/hadoop/BulkRecordWriter.java | 259 ++-----------------
.../hadoop/cql3/CqlBulkOutputFormat.java | 106 ++++++++
.../hadoop/cql3/CqlBulkRecordWriter.java | 199 ++++++++++++++
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlOutputFormat.java | 2 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 10 +-
.../cassandra/io/sstable/CQLSSTableWriter.java | 3 +-
11 files changed, 663 insertions(+), 292 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d31948..1ac22f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927)
* Don't depend on cassandra config for nodetool ring (CASSANDRA-7508)
* (cqlsh) Fix failing cqlsh formatting tests (CASSANDRA-7703)
* Fix MS expiring map timeout for Paxos messages (CASSANDRA-7752)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
new file mode 100644
index 0000000..c0e91da
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+
+public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V>
+ implements org.apache.hadoop.mapred.OutputFormat<K, V>
+{
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ private void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ {
+ throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
+ }
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ public static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
new file mode 100644
index 0000000..22255a6
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.cassandra.utils.OutputHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V>
+implements org.apache.hadoop.mapred.RecordWriter<K, V>
+{
+ public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+ public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+ public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+ public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+ private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class);
+
+ protected final Configuration conf;
+ protected final int maxFailures;
+ protected final int bufferSize;
+ protected Closeable writer;
+ protected SSTableLoader loader;
+ protected Progressable progress;
+ protected TaskAttemptContext context;
+
+ protected AbstractBulkRecordWriter(TaskAttemptContext context)
+ {
+ this(HadoopCompat.getConfiguration(context));
+ this.context = context;
+ }
+
+ protected AbstractBulkRecordWriter(Configuration conf, Progressable progress)
+ {
+ this(conf);
+ this.progress = progress;
+ }
+
+ protected AbstractBulkRecordWriter(Configuration conf)
+ {
+ Config.setClientMode(true);
+ Config.setOutboundBindAny(true);
+ this.conf = conf;
+ DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+ maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+ bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
+ }
+
+ protected String getOutputLocation() throws IOException
+ {
+ String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+ if (dir == null)
+ throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+ return dir;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
+
+ private void close() throws IOException
+ {
+ if (writer != null)
+ {
+ writer.close();
+ Future<StreamState> future = loader.stream();
+ while (true)
+ {
+ try
+ {
+ future.get(1000, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (ExecutionException | TimeoutException te)
+ {
+ if (null != progress)
+ progress.progress();
+ if (null != context)
+ HadoopCompat.progress(context);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ if (loader.getFailedHosts().size() > 0)
+ {
+ if (loader.getFailedHosts().size() > maxFailures)
+ throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+ else
+ logger.warn("Some hosts failed: {}", loader.getFailedHosts());
+ }
+ }
+ }
+
+ public static class ExternalClient extends SSTableLoader.Client
+ {
+ private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
+ private final Configuration conf;
+ private final String hostlist;
+ private final int rpcPort;
+ private final String username;
+ private final String password;
+
+ public ExternalClient(Configuration conf)
+ {
+ super();
+ this.conf = conf;
+ this.hostlist = ConfigHelper.getOutputInitialAddress(conf);
+ this.rpcPort = ConfigHelper.getOutputRpcPort(conf);
+ this.username = ConfigHelper.getOutputKeyspaceUserName(conf);
+ this.password = ConfigHelper.getOutputKeyspacePassword(conf);
+ }
+
+ public void init(String keyspace)
+ {
+ Set<InetAddress> hosts = new HashSet<InetAddress>();
+ String[] nodes = hostlist.split(",");
+ for (String node : nodes)
+ {
+ try
+ {
+ hosts.add(InetAddress.getByName(node));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ Iterator<InetAddress> hostiter = hosts.iterator();
+ while (hostiter.hasNext())
+ {
+ try
+ {
+ InetAddress host = hostiter.next();
+ Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort);
+
+ // log in
+ client.set_keyspace(keyspace);
+ if (username != null)
+ {
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(IAuthenticator.USERNAME_KEY, username);
+ creds.put(IAuthenticator.PASSWORD_KEY, password);
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+ client.login(authRequest);
+ }
+
+ List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+ List<KsDef> ksDefs = client.describe_keyspaces();
+
+ setPartitioner(client.describe_partitioner());
+ Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
+
+ for (TokenRange tr : tokenRanges)
+ {
+ Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+ for (String ep : tr.endpoints)
+ {
+ addRangeForEndpoint(range, InetAddress.getByName(ep));
+ }
+ }
+
+ for (KsDef ksDef : ksDefs)
+ {
+ Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
+ for (CfDef cfDef : ksDef.cf_defs)
+ cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
+ knownCfs.put(ksDef.name, cfs);
+ }
+ break;
+ }
+ catch (Exception e)
+ {
+ if (!hostiter.hasNext())
+ throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
+ }
+ }
+ }
+
+ public CFMetaData getCFMetaData(String keyspace, String cfName)
+ {
+ Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
+ return cfs != null ? cfs.get(cfName) : null;
+ }
+ }
+
+ public static class NullOutputHandler implements OutputHandler
+ {
+ public void output(String msg) {}
+ public void debug(String msg) {}
+ public void warn(String msg) {}
+ public void warn(String msg, Throwable th) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index c3d8e05..f5a5a8d 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -23,39 +23,10 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.thrift.Mutation;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
-public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
- implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
+public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>>
{
- @Override
- public void checkOutputSpecs(JobContext context)
- {
- checkOutputSpecs(HadoopCompat.getConfiguration(context));
- }
-
- private void checkOutputSpecs(Configuration conf)
- {
- if (ConfigHelper.getOutputKeyspace(conf) == null)
- {
- throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
- }
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
- {
- return new NullOutputCommitter();
- }
-
- /** Fills the deprecated OutputFormat interface for streaming. */
- @Deprecated
- public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
- {
- checkOutputSpecs(job);
- }
-
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
@@ -68,22 +39,4 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
{
return new BulkRecordWriter(context);
}
-
- public static class NullOutputCommitter extends OutputCommitter
- {
- public void abortTask(TaskAttemptContext taskContext) { }
-
- public void cleanupJob(JobContext jobContext) { }
-
- public void commitTask(TaskAttemptContext taskContext) { }
-
- public boolean needsTaskCommit(TaskAttemptContext taskContext)
- {
- return false;
- }
-
- public void setupJob(JobContext jobContext) { }
-
- public void setupTask(TaskAttemptContext taskContext) { }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index 8bfc958..d67b856 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -19,57 +19,25 @@ package org.apache.cassandra.hadoop;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.CounterColumn;
+import org.apache.cassandra.thrift.Mutation;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.protocol.*;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import org.apache.hadoop.util.Progressable;
-final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
-implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
+public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>>
{
- private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
- private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
- private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
- private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
- private final Configuration conf;
- private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
- private SSTableSimpleUnsortedWriter writer;
- private SSTableLoader loader;
- private File outputdir;
- private Progressable progress;
- private TaskAttemptContext context;
- private int maxFailures;
-
+ private File outputDir;
+
+
private enum CFType
{
NORMAL,
@@ -87,31 +55,17 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
BulkRecordWriter(TaskAttemptContext context)
{
- this(HadoopCompat.getConfiguration(context));
- this.context = context;
+ super(context);
}
BulkRecordWriter(Configuration conf, Progressable progress)
{
- this(conf);
- this.progress = progress;
+ super(conf, progress);
}
BulkRecordWriter(Configuration conf)
{
- Config.setClientMode(true);
- Config.setOutboundBindAny(true);
- this.conf = conf;
- DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
- maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
- }
-
- private String getOutputLocation() throws IOException
- {
- String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
- if (dir == null)
- throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
- return dir;
+ super(conf);
}
private void setTypes(Mutation mutation)
@@ -131,26 +85,23 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
private void prepareWriter() throws IOException
{
- if (outputdir == null)
+ if (outputDir == null)
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
//dir must be named by ks/cf for the loader
- outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
- outputdir.mkdirs();
+ outputDir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
+ outputDir.mkdirs();
}
if (writer == null)
{
AbstractType<?> subcomparator = null;
- ExternalClient externalClient = null;
- String username = ConfigHelper.getOutputKeyspaceUserName(conf);
- String password = ConfigHelper.getOutputKeyspacePassword(conf);
if (cfType == CFType.SUPER)
subcomparator = BytesType.instance;
- this.writer = new SSTableSimpleUnsortedWriter(
- outputdir,
+ writer = new SSTableSimpleUnsortedWriter(
+ outputDir,
ConfigHelper.getOutputPartitioner(conf),
ConfigHelper.getOutputKeyspace(conf),
ConfigHelper.getOutputColumnFamily(conf),
@@ -159,12 +110,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")),
ConfigHelper.getOutputCompressionParamaters(conf));
- externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf),
- ConfigHelper.getOutputRpcPort(conf),
- username,
- password);
-
- this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler());
+ this.loader = new SSTableLoader(outputDir, new ExternalClient(conf), new NullOutputHandler());
}
}
@@ -173,36 +119,37 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
{
setTypes(value.get(0));
prepareWriter();
- writer.newRow(keybuff);
+ SSTableSimpleUnsortedWriter ssWriter = (SSTableSimpleUnsortedWriter) writer;
+ ssWriter.newRow(keybuff);
for (Mutation mut : value)
{
if (cfType == CFType.SUPER)
{
- writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
+ ssWriter.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
if (colType == ColType.COUNTER)
for (CounterColumn column : mut.getColumn_or_supercolumn().getCounter_super_column().columns)
- writer.addCounterColumn(column.name, column.value);
+ ssWriter.addCounterColumn(column.name, column.value);
else
{
for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns)
{
if(column.ttl == 0)
- writer.addColumn(column.name, column.value, column.timestamp);
+ ssWriter.addColumn(column.name, column.value, column.timestamp);
else
- writer.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000));
+ ssWriter.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000));
}
}
}
else
{
if (colType == ColType.COUNTER)
- writer.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value);
+ ssWriter.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value);
else
{
if(mut.getColumn_or_supercolumn().column.ttl == 0)
- writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
+ ssWriter.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
else
- writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
+ ssWriter.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
}
}
if (null != progress)
@@ -211,158 +158,4 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
HadoopCompat.progress(context);
}
}
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException
- {
- close();
- }
-
- /** Fills the deprecated RecordWriter interface for streaming. */
- @Deprecated
- public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
- {
- close();
- }
-
- private void close() throws IOException
- {
- if (writer != null)
- {
- writer.close();
- Future<StreamState> future = loader.stream();
- while (true)
- {
- try
- {
- future.get(1000, TimeUnit.MILLISECONDS);
- break;
- }
- catch (ExecutionException | TimeoutException te)
- {
- if (null != progress)
- progress.progress();
- if (null != context)
- HadoopCompat.progress(context);
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- }
- if (loader.getFailedHosts().size() > 0)
- {
- if (loader.getFailedHosts().size() > maxFailures)
- throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
- else
- logger.warn("Some hosts failed: " + loader.getFailedHosts());
- }
- }
- }
-
- static class ExternalClient extends SSTableLoader.Client
- {
- private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
- private final String hostlist;
- private final int rpcPort;
- private final String username;
- private final String password;
-
- public ExternalClient(String hostlist, int port, String username, String password)
- {
- super();
- this.hostlist = hostlist;
- this.rpcPort = port;
- this.username = username;
- this.password = password;
- }
-
- public void init(String keyspace)
- {
- Set<InetAddress> hosts = new HashSet<InetAddress>();
- String[] nodes = hostlist.split(",");
- for (String node : nodes)
- {
- try
- {
- hosts.add(InetAddress.getByName(node));
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- }
- Iterator<InetAddress> hostiter = hosts.iterator();
- while (hostiter.hasNext())
- {
- try
- {
- InetAddress host = hostiter.next();
- Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
-
- // log in
- client.set_keyspace(keyspace);
- if (username != null)
- {
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(IAuthenticator.USERNAME_KEY, username);
- creds.put(IAuthenticator.PASSWORD_KEY, password);
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- client.login(authRequest);
- }
-
- List<TokenRange> tokenRanges = client.describe_ring(keyspace);
- List<KsDef> ksDefs = client.describe_keyspaces();
-
- setPartitioner(client.describe_partitioner());
- Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
- for (TokenRange tr : tokenRanges)
- {
- Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
- for (String ep : tr.endpoints)
- {
- addRangeForEndpoint(range, InetAddress.getByName(ep));
- }
- }
-
- for (KsDef ksDef : ksDefs)
- {
- Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
- for (CfDef cfDef : ksDef.cf_defs)
- cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
- knownCfs.put(ksDef.name, cfs);
- }
- break;
- }
- catch (Exception e)
- {
- if (!hostiter.hasNext())
- throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
- }
- }
- }
-
- public CFMetaData getCFMetaData(String keyspace, String cfName)
- {
- Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
- return cfs != null ? cfs.get(cfName) : null;
- }
-
- private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException
- {
- TSocket socket = new TSocket(host, port);
- TTransport trans = new TFramedTransport(socket);
- trans.open();
- TProtocol protocol = new org.apache.thrift.protocol.TBinaryProtocol(trans);
- return new Cassandra.Client(protocol);
- }
- }
-
- static class NullOutputHandler implements OutputHandler
- {
- public void output(String msg) {}
- public void debug(String msg) {}
- public void warn(String msg) {}
- public void warn(String msg, Throwable th) {}
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
new file mode 100644
index 0000000..58e05b6
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.hadoop.AbstractBulkOutputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The <code>CqlBulkOutputFormat</code> acts as a Hadoop-specific
+ * OutputFormat that allows reduce tasks to store keys (and corresponding
+ * bound variable values) as CQL rows (and respective columns) in a given
+ * ColumnFamily.
+ *
+ * <p>
+ * As is the case with the {@link org.apache.cassandra.hadoop.CqlOutputFormat},
+ * you need to set the prepared statement in your
+ * Hadoop job Configuration. The {@link CqlConfigHelper} class, through its
+ * {@link ConfigHelper#setOutputPreparedStatement} method, is provided to make this
+ * simple.
+ * you need to set the Keyspace. The {@link ConfigHelper} class, through its
+ * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
+ * simple.
+ * </p>
+ */
+public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<ByteBuffer>>
+{
+
+ private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema.";
+ private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert.";
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public CqlBulkRecordWriter getRecordWriter(FileSystem filesystem, JobConf job, String name, Progressable progress) throws IOException
+ {
+ return new CqlBulkRecordWriter(job, progress);
+ }
+
+ /**
+ * Get the {@link RecordWriter} for the given task.
+ *
+ * @param context
+ * the information about the current task.
+ * @return a {@link RecordWriter} to write the output for the job.
+ * @throws IOException
+ */
+ public CqlBulkRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new CqlBulkRecordWriter(context);
+ }
+
+ public static void setColumnFamilySchema(Configuration conf, String columnFamily, String schema)
+ {
+ conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema);
+ }
+
+ public static void setColumnFamilyInsertStatement(Configuration conf, String columnFamily, String insertStatement)
+ {
+ conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
+ }
+
+ public static String getColumnFamilySchema(Configuration conf, String columnFamily)
+ {
+ String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);
+ if (schema == null)
+ {
+ throw new UnsupportedOperationException("You must set the ColumnFamily schema using setColumnFamilySchema.");
+ }
+ return schema;
+ }
+
+ public static String getColumnFamilyInsertStatement(Configuration conf, String columnFamily)
+ {
+ String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily);
+ if (insert == null)
+ {
+ throw new UnsupportedOperationException("You must set the ColumnFamily insert statement using setColumnFamilySchema.");
+ }
+ return insert;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
new file mode 100644
index 0000000..7a75bb4
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
+import org.apache.cassandra.hadoop.BulkRecordWriter;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The <code>CqlBulkRecordWriter</code> maps the output <key, value>
+ * pairs to a Cassandra column family. In particular, it applies the binded variables
+ * in the value to the prepared statement, which it associates with the key, and in
+ * turn the responsible endpoint.
+ *
+ * <p>
+ * Furthermore, this writer groups the cql queries by the endpoint responsible for
+ * the rows being affected. This allows the cql queries to be executed in parallel,
+ * directly to a responsible endpoint.
+ * </p>
+ *
+ * @see CqlBulkOutputFormat
+ */
+public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<ByteBuffer>>
+{
+ private String keyspace;
+ private String columnFamily;
+ private String schema;
+ private String insertStatement;
+ private File outputDir;
+
+ CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
+ {
+ super(context);
+ setConfigs();
+ }
+
+ CqlBulkRecordWriter(Configuration conf, Progressable progress) throws IOException
+ {
+ super(conf, progress);
+ setConfigs();
+ }
+
+ CqlBulkRecordWriter(Configuration conf) throws IOException
+ {
+ super(conf);
+ setConfigs();
+ }
+
+ private void setConfigs() throws IOException
+ {
+ // if anything is missing, exceptions will be thrown here, instead of on write()
+ keyspace = ConfigHelper.getOutputKeyspace(conf);
+ columnFamily = ConfigHelper.getOutputColumnFamily(conf);
+ schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily);
+ insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
+ outputDir = getColumnFamilyDirectory();
+ }
+
+
+ private void prepareWriter() throws IOException
+ {
+ try
+ {
+ if (writer == null)
+ {
+ writer = CQLSSTableWriter.builder()
+ .forTable(schema)
+ .using(insertStatement)
+ .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
+ .inDirectory(outputDir)
+ .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
+ .build();
+ }
+ if (loader == null)
+ {
+ ExternalClient externalClient = new ExternalClient(conf);
+
+ externalClient.addKnownCfs(keyspace, schema);
+
+ this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler());
+ }
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * The column values must correspond to the order in which
+ * they appear in the insert stored procedure.
+ *
+ * Key is not used, so it can be null or any object.
+ * </p>
+ *
+ * @param key
+ * any object or null.
+ * @param values
+ * the values to write.
+ * @throws IOException
+ */
+ @Override
+ public void write(Object key, List<ByteBuffer> values) throws IOException
+ {
+ prepareWriter();
+ try
+ {
+ ((CQLSSTableWriter) writer).rawAddRow(values);
+
+ if (null != progress)
+ progress.progress();
+ if (null != context)
+ HadoopCompat.progress(context);
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new IOException("Error adding row with key: " + key, e);
+ }
+ }
+
+ private File getColumnFamilyDirectory() throws IOException
+ {
+ File dir = new File(String.format("%s%s%s%s%s", getOutputLocation(), File.separator, keyspace, File.separator, columnFamily));
+
+ if (!dir.exists() && !dir.mkdirs())
+ {
+ throw new IOException("Failed to created output directory: " + dir);
+ }
+
+ return dir;
+ }
+
+ public static class ExternalClient extends AbstractBulkRecordWriter.ExternalClient
+ {
+ private Map<String, Map<String, CFMetaData>> knownCqlCfs = new HashMap<>();
+
+ public ExternalClient(Configuration conf)
+ {
+ super(conf);
+ }
+
+ public void addKnownCfs(String keyspace, String cql)
+ {
+ Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
+
+ if (cfs == null)
+ {
+ cfs = new HashMap<>();
+ knownCqlCfs.put(keyspace, cfs);
+ }
+
+ CFMetaData metadata = CFMetaData.compile(cql, keyspace);
+ cfs.put(metadata.cfName, metadata);
+ }
+
+ @Override
+ public CFMetaData getCFMetaData(String keyspace, String cfName)
+ {
+ CFMetaData metadata = super.getCFMetaData(keyspace, cfName);
+ if (metadata != null)
+ {
+ return metadata;
+ }
+
+ Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
+ return cfs != null ? cfs.get(cfName) : null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index b2c8fbf..e894996 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -89,7 +89,7 @@ public class CqlConfigHelper
private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
private static final String OUTPUT_CQL = "cassandra.output.cql";
-
+
/**
* Set the CQL columns for the input of this job.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index 7c89bef..5845175 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.*;
/**
* The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
* OutputFormat that allows reduce tasks to store keys (and corresponding
- * binded variable values) as CQL rows (and respective columns) in a given
+ * bound variable values) as CQL rows (and respective columns) in a given
* ColumnFamily.
*
* <p>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index e337185..db87226 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.io.sstable;
+import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -34,7 +35,7 @@ import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.HeapAllocator;
import org.apache.cassandra.utils.Pair;
-public abstract class AbstractSSTableSimpleWriter
+public abstract class AbstractSSTableSimpleWriter implements Closeable
{
protected final File directory;
protected final CFMetaData metadata;
@@ -161,13 +162,6 @@ public abstract class AbstractSSTableSimpleWriter
}
/**
- * Close this writer.
- * This method should be called, otherwise the produced sstables are not
- * guaranteed to be complete (and won't be in practice).
- */
- public abstract void close() throws IOException;
-
- /**
* Package protected for use by AbstractCQLSSTableWriter.
* Not meant to be exposed publicly.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index a7ece70..61990ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.io.sstable;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -71,7 +72,7 @@ import org.apache.cassandra.utils.Pair;
* writer.close();
* </pre>
*/
-public class CQLSSTableWriter
+public class CQLSSTableWriter implements Closeable
{
private final AbstractSSTableSimpleWriter writer;
private final UpdateStatement insert;