You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/08/13 18:57:56 UTC
[1/3] git commit: Add CqlOutputFormat patch by Paul Pak; reviewed by Piotr Kołaczkowski for CASSANDRA-6927
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 6a7235e9e -> 88ad4f451
refs/heads/trunk a26f19266 -> c61784cd8
Add CqlOutputFormat
patch by Paul Pak; reviewed by Piotr Kołaczkowski for CASSANDRA-6927
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/88ad4f45
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/88ad4f45
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/88ad4f45
Branch: refs/heads/cassandra-2.1
Commit: 88ad4f4514765c62351ea02553769047a6c1e24c
Parents: 6a7235e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Aug 13 11:57:43 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Aug 13 11:57:43 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/AbstractBulkOutputFormat.java | 73 ++++++
.../hadoop/AbstractBulkRecordWriter.java | 251 ++++++++++++++++++
.../cassandra/hadoop/BulkOutputFormat.java | 49 +---
.../cassandra/hadoop/BulkRecordWriter.java | 259 ++-----------------
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlOutputFormat.java | 2 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 10 +-
.../cassandra/io/sstable/CQLSSTableWriter.java | 3 +-
9 files changed, 358 insertions(+), 292 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69c4adc..de93018 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.1
+ * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927)
* Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
* SSTableExport uses correct validator to create string representation of partition
keys (CASSANDRA-7498)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
new file mode 100644
index 0000000..c0e91da
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+
+public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V>
+ implements org.apache.hadoop.mapred.OutputFormat<K, V>
+{
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ private void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ {
+ throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
+ }
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ public static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
new file mode 100644
index 0000000..22255a6
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.cassandra.utils.OutputHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V>
+implements org.apache.hadoop.mapred.RecordWriter<K, V>
+{
+ public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+ public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+ public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+ public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+ private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class);
+
+ protected final Configuration conf;
+ protected final int maxFailures;
+ protected final int bufferSize;
+ protected Closeable writer;
+ protected SSTableLoader loader;
+ protected Progressable progress;
+ protected TaskAttemptContext context;
+
+ protected AbstractBulkRecordWriter(TaskAttemptContext context)
+ {
+ this(HadoopCompat.getConfiguration(context));
+ this.context = context;
+ }
+
+ protected AbstractBulkRecordWriter(Configuration conf, Progressable progress)
+ {
+ this(conf);
+ this.progress = progress;
+ }
+
+ protected AbstractBulkRecordWriter(Configuration conf)
+ {
+ Config.setClientMode(true);
+ Config.setOutboundBindAny(true);
+ this.conf = conf;
+ DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+ maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+ bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
+ }
+
+ protected String getOutputLocation() throws IOException
+ {
+ String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+ if (dir == null)
+ throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+ return dir;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
+
+ private void close() throws IOException
+ {
+ if (writer != null)
+ {
+ writer.close();
+ Future<StreamState> future = loader.stream();
+ while (true)
+ {
+ try
+ {
+ future.get(1000, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (ExecutionException | TimeoutException te)
+ {
+ if (null != progress)
+ progress.progress();
+ if (null != context)
+ HadoopCompat.progress(context);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ if (loader.getFailedHosts().size() > 0)
+ {
+ if (loader.getFailedHosts().size() > maxFailures)
+ throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+ else
+ logger.warn("Some hosts failed: {}", loader.getFailedHosts());
+ }
+ }
+ }
+
+ public static class ExternalClient extends SSTableLoader.Client
+ {
+ private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
+ private final Configuration conf;
+ private final String hostlist;
+ private final int rpcPort;
+ private final String username;
+ private final String password;
+
+ public ExternalClient(Configuration conf)
+ {
+ super();
+ this.conf = conf;
+ this.hostlist = ConfigHelper.getOutputInitialAddress(conf);
+ this.rpcPort = ConfigHelper.getOutputRpcPort(conf);
+ this.username = ConfigHelper.getOutputKeyspaceUserName(conf);
+ this.password = ConfigHelper.getOutputKeyspacePassword(conf);
+ }
+
+ public void init(String keyspace)
+ {
+ Set<InetAddress> hosts = new HashSet<InetAddress>();
+ String[] nodes = hostlist.split(",");
+ for (String node : nodes)
+ {
+ try
+ {
+ hosts.add(InetAddress.getByName(node));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ Iterator<InetAddress> hostiter = hosts.iterator();
+ while (hostiter.hasNext())
+ {
+ try
+ {
+ InetAddress host = hostiter.next();
+ Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort);
+
+ // log in
+ client.set_keyspace(keyspace);
+ if (username != null)
+ {
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(IAuthenticator.USERNAME_KEY, username);
+ creds.put(IAuthenticator.PASSWORD_KEY, password);
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+ client.login(authRequest);
+ }
+
+ List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+ List<KsDef> ksDefs = client.describe_keyspaces();
+
+ setPartitioner(client.describe_partitioner());
+ Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
+
+ for (TokenRange tr : tokenRanges)
+ {
+ Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+ for (String ep : tr.endpoints)
+ {
+ addRangeForEndpoint(range, InetAddress.getByName(ep));
+ }
+ }
+
+ for (KsDef ksDef : ksDefs)
+ {
+ Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
+ for (CfDef cfDef : ksDef.cf_defs)
+ cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
+ knownCfs.put(ksDef.name, cfs);
+ }
+ break;
+ }
+ catch (Exception e)
+ {
+ if (!hostiter.hasNext())
+ throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
+ }
+ }
+ }
+
+ public CFMetaData getCFMetaData(String keyspace, String cfName)
+ {
+ Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
+ return cfs != null ? cfs.get(cfName) : null;
+ }
+ }
+
+ public static class NullOutputHandler implements OutputHandler
+ {
+ public void output(String msg) {}
+ public void debug(String msg) {}
+ public void warn(String msg) {}
+ public void warn(String msg, Throwable th) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index c3d8e05..f5a5a8d 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -23,39 +23,10 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.thrift.Mutation;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
-public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
- implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
+public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>>
{
- @Override
- public void checkOutputSpecs(JobContext context)
- {
- checkOutputSpecs(HadoopCompat.getConfiguration(context));
- }
-
- private void checkOutputSpecs(Configuration conf)
- {
- if (ConfigHelper.getOutputKeyspace(conf) == null)
- {
- throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
- }
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
- {
- return new NullOutputCommitter();
- }
-
- /** Fills the deprecated OutputFormat interface for streaming. */
- @Deprecated
- public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
- {
- checkOutputSpecs(job);
- }
-
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
@@ -68,22 +39,4 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
{
return new BulkRecordWriter(context);
}
-
- public static class NullOutputCommitter extends OutputCommitter
- {
- public void abortTask(TaskAttemptContext taskContext) { }
-
- public void cleanupJob(JobContext jobContext) { }
-
- public void commitTask(TaskAttemptContext taskContext) { }
-
- public boolean needsTaskCommit(TaskAttemptContext taskContext)
- {
- return false;
- }
-
- public void setupJob(JobContext jobContext) { }
-
- public void setupTask(TaskAttemptContext taskContext) { }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index d6136a2..d67b856 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -19,57 +19,25 @@ package org.apache.cassandra.hadoop;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.CounterColumn;
+import org.apache.cassandra.thrift.Mutation;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.protocol.*;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import org.apache.hadoop.util.Progressable;
-final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
-implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
+public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>>
{
- private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
- private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
- private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
- private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
- private final Configuration conf;
- private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
- private SSTableSimpleUnsortedWriter writer;
- private SSTableLoader loader;
- private File outputdir;
- private Progressable progress;
- private TaskAttemptContext context;
- private int maxFailures;
-
+ private File outputDir;
+
+
private enum CFType
{
NORMAL,
@@ -87,31 +55,17 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
BulkRecordWriter(TaskAttemptContext context)
{
- this(HadoopCompat.getConfiguration(context));
- this.context = context;
+ super(context);
}
BulkRecordWriter(Configuration conf, Progressable progress)
{
- this(conf);
- this.progress = progress;
+ super(conf, progress);
}
BulkRecordWriter(Configuration conf)
{
- Config.setClientMode(true);
- Config.setOutboundBindAny(true);
- this.conf = conf;
- DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
- maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
- }
-
- private String getOutputLocation() throws IOException
- {
- String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
- if (dir == null)
- throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
- return dir;
+ super(conf);
}
private void setTypes(Mutation mutation)
@@ -131,26 +85,23 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
private void prepareWriter() throws IOException
{
- if (outputdir == null)
+ if (outputDir == null)
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
//dir must be named by ks/cf for the loader
- outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
- outputdir.mkdirs();
+ outputDir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
+ outputDir.mkdirs();
}
if (writer == null)
{
AbstractType<?> subcomparator = null;
- ExternalClient externalClient = null;
- String username = ConfigHelper.getOutputKeyspaceUserName(conf);
- String password = ConfigHelper.getOutputKeyspacePassword(conf);
if (cfType == CFType.SUPER)
subcomparator = BytesType.instance;
- this.writer = new SSTableSimpleUnsortedWriter(
- outputdir,
+ writer = new SSTableSimpleUnsortedWriter(
+ outputDir,
ConfigHelper.getOutputPartitioner(conf),
ConfigHelper.getOutputKeyspace(conf),
ConfigHelper.getOutputColumnFamily(conf),
@@ -159,12 +110,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")),
ConfigHelper.getOutputCompressionParamaters(conf));
- externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf),
- ConfigHelper.getOutputRpcPort(conf),
- username,
- password);
-
- this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler());
+ this.loader = new SSTableLoader(outputDir, new ExternalClient(conf), new NullOutputHandler());
}
}
@@ -173,36 +119,37 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
{
setTypes(value.get(0));
prepareWriter();
- writer.newRow(keybuff);
+ SSTableSimpleUnsortedWriter ssWriter = (SSTableSimpleUnsortedWriter) writer;
+ ssWriter.newRow(keybuff);
for (Mutation mut : value)
{
if (cfType == CFType.SUPER)
{
- writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
+ ssWriter.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
if (colType == ColType.COUNTER)
for (CounterColumn column : mut.getColumn_or_supercolumn().getCounter_super_column().columns)
- writer.addCounterColumn(column.name, column.value);
+ ssWriter.addCounterColumn(column.name, column.value);
else
{
for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns)
{
if(column.ttl == 0)
- writer.addColumn(column.name, column.value, column.timestamp);
+ ssWriter.addColumn(column.name, column.value, column.timestamp);
else
- writer.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000));
+ ssWriter.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000));
}
}
}
else
{
if (colType == ColType.COUNTER)
- writer.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value);
+ ssWriter.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value);
else
{
if(mut.getColumn_or_supercolumn().column.ttl == 0)
- writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
+ ssWriter.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
else
- writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
+ ssWriter.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
}
}
if (null != progress)
@@ -211,158 +158,4 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
HadoopCompat.progress(context);
}
}
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException
- {
- close();
- }
-
- /** Fills the deprecated RecordWriter interface for streaming. */
- @Deprecated
- public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
- {
- close();
- }
-
- private void close() throws IOException
- {
- if (writer != null)
- {
- writer.close();
- Future<StreamState> future = loader.stream();
- while (true)
- {
- try
- {
- future.get(1000, TimeUnit.MILLISECONDS);
- break;
- }
- catch (ExecutionException | TimeoutException te)
- {
- if (null != progress)
- progress.progress();
- if (null != context)
- HadoopCompat.progress(context);
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- }
- if (loader.getFailedHosts().size() > 0)
- {
- if (loader.getFailedHosts().size() > maxFailures)
- throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
- else
- logger.warn("Some hosts failed: {}", loader.getFailedHosts());
- }
- }
- }
-
- static class ExternalClient extends SSTableLoader.Client
- {
- private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
- private final String hostlist;
- private final int rpcPort;
- private final String username;
- private final String password;
-
- public ExternalClient(String hostlist, int port, String username, String password)
- {
- super();
- this.hostlist = hostlist;
- this.rpcPort = port;
- this.username = username;
- this.password = password;
- }
-
- public void init(String keyspace)
- {
- Set<InetAddress> hosts = new HashSet<InetAddress>();
- String[] nodes = hostlist.split(",");
- for (String node : nodes)
- {
- try
- {
- hosts.add(InetAddress.getByName(node));
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- }
- Iterator<InetAddress> hostiter = hosts.iterator();
- while (hostiter.hasNext())
- {
- try
- {
- InetAddress host = hostiter.next();
- Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
-
- // log in
- client.set_keyspace(keyspace);
- if (username != null)
- {
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(IAuthenticator.USERNAME_KEY, username);
- creds.put(IAuthenticator.PASSWORD_KEY, password);
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- client.login(authRequest);
- }
-
- List<TokenRange> tokenRanges = client.describe_ring(keyspace);
- List<KsDef> ksDefs = client.describe_keyspaces();
-
- setPartitioner(client.describe_partitioner());
- Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
- for (TokenRange tr : tokenRanges)
- {
- Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
- for (String ep : tr.endpoints)
- {
- addRangeForEndpoint(range, InetAddress.getByName(ep));
- }
- }
-
- for (KsDef ksDef : ksDefs)
- {
- Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
- for (CfDef cfDef : ksDef.cf_defs)
- cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
- knownCfs.put(ksDef.name, cfs);
- }
- break;
- }
- catch (Exception e)
- {
- if (!hostiter.hasNext())
- throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
- }
- }
- }
-
- public CFMetaData getCFMetaData(String keyspace, String cfName)
- {
- Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
- return cfs != null ? cfs.get(cfName) : null;
- }
-
- private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException
- {
- TSocket socket = new TSocket(host, port);
- TTransport trans = new TFramedTransport(socket);
- trans.open();
- TProtocol protocol = new org.apache.thrift.protocol.TBinaryProtocol(trans);
- return new Cassandra.Client(protocol);
- }
- }
-
- static class NullOutputHandler implements OutputHandler
- {
- public void output(String msg) {}
- public void debug(String msg) {}
- public void warn(String msg) {}
- public void warn(String msg, Throwable th) {}
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index b2c8fbf..e894996 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -89,7 +89,7 @@ public class CqlConfigHelper
private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
private static final String OUTPUT_CQL = "cassandra.output.cql";
-
+
/**
* Set the CQL columns for the input of this job.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index f8613ba..0d09ca2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.*;
/**
* The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
* OutputFormat that allows reduce tasks to store keys (and corresponding
- * binded variable values) as CQL rows (and respective columns) in a given
+ * bound variable values) as CQL rows (and respective columns) in a given
* ColumnFamily.
*
* <p>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 1b407c5..ae8300c 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.io.sstable;
+import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -34,7 +35,7 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.Pair;
-public abstract class AbstractSSTableSimpleWriter
+public abstract class AbstractSSTableSimpleWriter implements Closeable
{
protected final File directory;
protected final CFMetaData metadata;
@@ -162,13 +163,6 @@ public abstract class AbstractSSTableSimpleWriter
}
/**
- * Close this writer.
- * This method should be called, otherwise the produced sstables are not
- * guaranteed to be complete (and won't be in practice).
- */
- public abstract void close() throws IOException;
-
- /**
* Package protected for use by AbstractCQLSSTableWriter.
* Not meant to be exposed publicly.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 6993b19..427d2d4 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.io.sstable;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -71,7 +72,7 @@ import org.apache.cassandra.utils.Pair;
* writer.close();
* </pre>
*/
-public class CQLSSTableWriter
+public class CQLSSTableWriter implements Closeable
{
private final AbstractSSTableSimpleWriter writer;
private final UpdateStatement insert;
[3/3] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c61784cd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c61784cd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c61784cd
Branch: refs/heads/trunk
Commit: c61784cd8f183c816229bed6add0f642a45e3449
Parents: a26f192 88ad4f4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Aug 13 11:57:51 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Aug 13 11:57:51 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/AbstractBulkOutputFormat.java | 73 ++++++
.../hadoop/AbstractBulkRecordWriter.java | 251 ++++++++++++++++++
.../cassandra/hadoop/BulkOutputFormat.java | 49 +---
.../cassandra/hadoop/BulkRecordWriter.java | 259 ++-----------------
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlOutputFormat.java | 2 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 10 +-
.../cassandra/io/sstable/CQLSSTableWriter.java | 3 +-
9 files changed, 358 insertions(+), 292 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c61784cd/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9063813,de93018..4ed4d51
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,5 +1,22 @@@
+3.0
+ * Support pure user-defined functions (CASSANDRA-7395)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+
+
2.1.1
+ * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927)
* Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
* SSTableExport uses correct validator to create string representation of partition
keys (CASSANDRA-7498)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c61784cd/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
[2/3] git commit: Add CqlOutputFormat patch by Paul Pak; reviewed by Piotr Kołaczkowski for CASSANDRA-6927
Posted by jb...@apache.org.
Add CqlOutputFormat
patch by Paul Pak; reviewed by Piotr Kołaczkowski for CASSANDRA-6927
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/88ad4f45
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/88ad4f45
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/88ad4f45
Branch: refs/heads/trunk
Commit: 88ad4f4514765c62351ea02553769047a6c1e24c
Parents: 6a7235e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Aug 13 11:57:43 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Aug 13 11:57:43 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/AbstractBulkOutputFormat.java | 73 ++++++
.../hadoop/AbstractBulkRecordWriter.java | 251 ++++++++++++++++++
.../cassandra/hadoop/BulkOutputFormat.java | 49 +---
.../cassandra/hadoop/BulkRecordWriter.java | 259 ++-----------------
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlOutputFormat.java | 2 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 10 +-
.../cassandra/io/sstable/CQLSSTableWriter.java | 3 +-
9 files changed, 358 insertions(+), 292 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69c4adc..de93018 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.1
+ * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927)
* Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
* SSTableExport uses correct validator to create string representation of partition
keys (CASSANDRA-7498)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
new file mode 100644
index 0000000..c0e91da
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+
+public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V>
+ implements org.apache.hadoop.mapred.OutputFormat<K, V>
+{
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
+ }
+
+ private void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ {
+ throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
+ }
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ public static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
new file mode 100644
index 0000000..22255a6
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.cassandra.utils.OutputHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V>
+implements org.apache.hadoop.mapred.RecordWriter<K, V>
+{
+ public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+ public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+ public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+ public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+ private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class);
+
+ protected final Configuration conf;
+ protected final int maxFailures;
+ protected final int bufferSize;
+ protected Closeable writer;
+ protected SSTableLoader loader;
+ protected Progressable progress;
+ protected TaskAttemptContext context;
+
+ protected AbstractBulkRecordWriter(TaskAttemptContext context)
+ {
+ this(HadoopCompat.getConfiguration(context));
+ this.context = context;
+ }
+
+ protected AbstractBulkRecordWriter(Configuration conf, Progressable progress)
+ {
+ this(conf);
+ this.progress = progress;
+ }
+
+ protected AbstractBulkRecordWriter(Configuration conf)
+ {
+ Config.setClientMode(true);
+ Config.setOutboundBindAny(true);
+ this.conf = conf;
+ DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+ maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+ bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
+ }
+
+ protected String getOutputLocation() throws IOException
+ {
+ String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+ if (dir == null)
+ throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+ return dir;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
+
+ private void close() throws IOException
+ {
+ if (writer != null)
+ {
+ writer.close();
+ Future<StreamState> future = loader.stream();
+ while (true)
+ {
+ try
+ {
+ future.get(1000, TimeUnit.MILLISECONDS);
+ break;
+ }
+ catch (ExecutionException | TimeoutException te)
+ {
+ if (null != progress)
+ progress.progress();
+ if (null != context)
+ HadoopCompat.progress(context);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ if (loader.getFailedHosts().size() > 0)
+ {
+ if (loader.getFailedHosts().size() > maxFailures)
+ throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+ else
+ logger.warn("Some hosts failed: {}", loader.getFailedHosts());
+ }
+ }
+ }
+
+ public static class ExternalClient extends SSTableLoader.Client
+ {
+ private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
+ private final Configuration conf;
+ private final String hostlist;
+ private final int rpcPort;
+ private final String username;
+ private final String password;
+
+ public ExternalClient(Configuration conf)
+ {
+ super();
+ this.conf = conf;
+ this.hostlist = ConfigHelper.getOutputInitialAddress(conf);
+ this.rpcPort = ConfigHelper.getOutputRpcPort(conf);
+ this.username = ConfigHelper.getOutputKeyspaceUserName(conf);
+ this.password = ConfigHelper.getOutputKeyspacePassword(conf);
+ }
+
+ public void init(String keyspace)
+ {
+ Set<InetAddress> hosts = new HashSet<InetAddress>();
+ String[] nodes = hostlist.split(",");
+ for (String node : nodes)
+ {
+ try
+ {
+ hosts.add(InetAddress.getByName(node));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ Iterator<InetAddress> hostiter = hosts.iterator();
+ while (hostiter.hasNext())
+ {
+ try
+ {
+ InetAddress host = hostiter.next();
+ Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort);
+
+ // log in
+ client.set_keyspace(keyspace);
+ if (username != null)
+ {
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(IAuthenticator.USERNAME_KEY, username);
+ creds.put(IAuthenticator.PASSWORD_KEY, password);
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+ client.login(authRequest);
+ }
+
+ List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+ List<KsDef> ksDefs = client.describe_keyspaces();
+
+ setPartitioner(client.describe_partitioner());
+ Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
+
+ for (TokenRange tr : tokenRanges)
+ {
+ Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+ for (String ep : tr.endpoints)
+ {
+ addRangeForEndpoint(range, InetAddress.getByName(ep));
+ }
+ }
+
+ for (KsDef ksDef : ksDefs)
+ {
+ Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
+ for (CfDef cfDef : ksDef.cf_defs)
+ cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
+ knownCfs.put(ksDef.name, cfs);
+ }
+ break;
+ }
+ catch (Exception e)
+ {
+ if (!hostiter.hasNext())
+ throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
+ }
+ }
+ }
+
+ public CFMetaData getCFMetaData(String keyspace, String cfName)
+ {
+ Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
+ return cfs != null ? cfs.get(cfName) : null;
+ }
+ }
+
+ public static class NullOutputHandler implements OutputHandler
+ {
+ public void output(String msg) {}
+ public void debug(String msg) {}
+ public void warn(String msg) {}
+ public void warn(String msg, Throwable th) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index c3d8e05..f5a5a8d 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -23,39 +23,10 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.thrift.Mutation;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
-public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
- implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
+public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>>
{
- @Override
- public void checkOutputSpecs(JobContext context)
- {
- checkOutputSpecs(HadoopCompat.getConfiguration(context));
- }
-
- private void checkOutputSpecs(Configuration conf)
- {
- if (ConfigHelper.getOutputKeyspace(conf) == null)
- {
- throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
- }
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
- {
- return new NullOutputCommitter();
- }
-
- /** Fills the deprecated OutputFormat interface for streaming. */
- @Deprecated
- public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
- {
- checkOutputSpecs(job);
- }
-
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
@@ -68,22 +39,4 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
{
return new BulkRecordWriter(context);
}
-
- public static class NullOutputCommitter extends OutputCommitter
- {
- public void abortTask(TaskAttemptContext taskContext) { }
-
- public void cleanupJob(JobContext jobContext) { }
-
- public void commitTask(TaskAttemptContext taskContext) { }
-
- public boolean needsTaskCommit(TaskAttemptContext taskContext)
- {
- return false;
- }
-
- public void setupJob(JobContext jobContext) { }
-
- public void setupTask(TaskAttemptContext taskContext) { }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index d6136a2..d67b856 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -19,57 +19,25 @@ package org.apache.cassandra.hadoop;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.CounterColumn;
+import org.apache.cassandra.thrift.Mutation;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.protocol.*;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import org.apache.hadoop.util.Progressable;
-final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
-implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
+public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>>
{
- private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
- private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
- private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
- private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
- private final Configuration conf;
- private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
- private SSTableSimpleUnsortedWriter writer;
- private SSTableLoader loader;
- private File outputdir;
- private Progressable progress;
- private TaskAttemptContext context;
- private int maxFailures;
-
+ private File outputDir;
+
+
private enum CFType
{
NORMAL,
@@ -87,31 +55,17 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
BulkRecordWriter(TaskAttemptContext context)
{
- this(HadoopCompat.getConfiguration(context));
- this.context = context;
+ super(context);
}
BulkRecordWriter(Configuration conf, Progressable progress)
{
- this(conf);
- this.progress = progress;
+ super(conf, progress);
}
BulkRecordWriter(Configuration conf)
{
- Config.setClientMode(true);
- Config.setOutboundBindAny(true);
- this.conf = conf;
- DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
- maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
- }
-
- private String getOutputLocation() throws IOException
- {
- String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
- if (dir == null)
- throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
- return dir;
+ super(conf);
}
private void setTypes(Mutation mutation)
@@ -131,26 +85,23 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
private void prepareWriter() throws IOException
{
- if (outputdir == null)
+ if (outputDir == null)
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
//dir must be named by ks/cf for the loader
- outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
- outputdir.mkdirs();
+ outputDir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
+ outputDir.mkdirs();
}
if (writer == null)
{
AbstractType<?> subcomparator = null;
- ExternalClient externalClient = null;
- String username = ConfigHelper.getOutputKeyspaceUserName(conf);
- String password = ConfigHelper.getOutputKeyspacePassword(conf);
if (cfType == CFType.SUPER)
subcomparator = BytesType.instance;
- this.writer = new SSTableSimpleUnsortedWriter(
- outputdir,
+ writer = new SSTableSimpleUnsortedWriter(
+ outputDir,
ConfigHelper.getOutputPartitioner(conf),
ConfigHelper.getOutputKeyspace(conf),
ConfigHelper.getOutputColumnFamily(conf),
@@ -159,12 +110,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")),
ConfigHelper.getOutputCompressionParamaters(conf));
- externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf),
- ConfigHelper.getOutputRpcPort(conf),
- username,
- password);
-
- this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler());
+ this.loader = new SSTableLoader(outputDir, new ExternalClient(conf), new NullOutputHandler());
}
}
@@ -173,36 +119,37 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
{
setTypes(value.get(0));
prepareWriter();
- writer.newRow(keybuff);
+ SSTableSimpleUnsortedWriter ssWriter = (SSTableSimpleUnsortedWriter) writer;
+ ssWriter.newRow(keybuff);
for (Mutation mut : value)
{
if (cfType == CFType.SUPER)
{
- writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
+ ssWriter.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
if (colType == ColType.COUNTER)
for (CounterColumn column : mut.getColumn_or_supercolumn().getCounter_super_column().columns)
- writer.addCounterColumn(column.name, column.value);
+ ssWriter.addCounterColumn(column.name, column.value);
else
{
for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns)
{
if(column.ttl == 0)
- writer.addColumn(column.name, column.value, column.timestamp);
+ ssWriter.addColumn(column.name, column.value, column.timestamp);
else
- writer.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000));
+ ssWriter.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000));
}
}
}
else
{
if (colType == ColType.COUNTER)
- writer.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value);
+ ssWriter.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value);
else
{
if(mut.getColumn_or_supercolumn().column.ttl == 0)
- writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
+ ssWriter.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
else
- writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
+ ssWriter.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
}
}
if (null != progress)
@@ -211,158 +158,4 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
HadoopCompat.progress(context);
}
}
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException
- {
- close();
- }
-
- /** Fills the deprecated RecordWriter interface for streaming. */
- @Deprecated
- public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
- {
- close();
- }
-
- private void close() throws IOException
- {
- if (writer != null)
- {
- writer.close();
- Future<StreamState> future = loader.stream();
- while (true)
- {
- try
- {
- future.get(1000, TimeUnit.MILLISECONDS);
- break;
- }
- catch (ExecutionException | TimeoutException te)
- {
- if (null != progress)
- progress.progress();
- if (null != context)
- HadoopCompat.progress(context);
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- }
- if (loader.getFailedHosts().size() > 0)
- {
- if (loader.getFailedHosts().size() > maxFailures)
- throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
- else
- logger.warn("Some hosts failed: {}", loader.getFailedHosts());
- }
- }
- }
-
- static class ExternalClient extends SSTableLoader.Client
- {
- private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
- private final String hostlist;
- private final int rpcPort;
- private final String username;
- private final String password;
-
- public ExternalClient(String hostlist, int port, String username, String password)
- {
- super();
- this.hostlist = hostlist;
- this.rpcPort = port;
- this.username = username;
- this.password = password;
- }
-
- public void init(String keyspace)
- {
- Set<InetAddress> hosts = new HashSet<InetAddress>();
- String[] nodes = hostlist.split(",");
- for (String node : nodes)
- {
- try
- {
- hosts.add(InetAddress.getByName(node));
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- }
- Iterator<InetAddress> hostiter = hosts.iterator();
- while (hostiter.hasNext())
- {
- try
- {
- InetAddress host = hostiter.next();
- Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
-
- // log in
- client.set_keyspace(keyspace);
- if (username != null)
- {
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(IAuthenticator.USERNAME_KEY, username);
- creds.put(IAuthenticator.PASSWORD_KEY, password);
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- client.login(authRequest);
- }
-
- List<TokenRange> tokenRanges = client.describe_ring(keyspace);
- List<KsDef> ksDefs = client.describe_keyspaces();
-
- setPartitioner(client.describe_partitioner());
- Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
- for (TokenRange tr : tokenRanges)
- {
- Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
- for (String ep : tr.endpoints)
- {
- addRangeForEndpoint(range, InetAddress.getByName(ep));
- }
- }
-
- for (KsDef ksDef : ksDefs)
- {
- Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
- for (CfDef cfDef : ksDef.cf_defs)
- cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
- knownCfs.put(ksDef.name, cfs);
- }
- break;
- }
- catch (Exception e)
- {
- if (!hostiter.hasNext())
- throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
- }
- }
- }
-
- public CFMetaData getCFMetaData(String keyspace, String cfName)
- {
- Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
- return cfs != null ? cfs.get(cfName) : null;
- }
-
- private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException
- {
- TSocket socket = new TSocket(host, port);
- TTransport trans = new TFramedTransport(socket);
- trans.open();
- TProtocol protocol = new org.apache.thrift.protocol.TBinaryProtocol(trans);
- return new Cassandra.Client(protocol);
- }
- }
-
- static class NullOutputHandler implements OutputHandler
- {
- public void output(String msg) {}
- public void debug(String msg) {}
- public void warn(String msg) {}
- public void warn(String msg, Throwable th) {}
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index b2c8fbf..e894996 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -89,7 +89,7 @@ public class CqlConfigHelper
private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
private static final String OUTPUT_CQL = "cassandra.output.cql";
-
+
/**
* Set the CQL columns for the input of this job.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index f8613ba..0d09ca2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.*;
/**
* The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
* OutputFormat that allows reduce tasks to store keys (and corresponding
- * binded variable values) as CQL rows (and respective columns) in a given
+ * bound variable values) as CQL rows (and respective columns) in a given
* ColumnFamily.
*
* <p>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 1b407c5..ae8300c 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.io.sstable;
+import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -34,7 +35,7 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.Pair;
-public abstract class AbstractSSTableSimpleWriter
+public abstract class AbstractSSTableSimpleWriter implements Closeable
{
protected final File directory;
protected final CFMetaData metadata;
@@ -162,13 +163,6 @@ public abstract class AbstractSSTableSimpleWriter
}
/**
- * Close this writer.
- * This method should be called, otherwise the produced sstables are not
- * guaranteed to be complete (and won't be in practice).
- */
- public abstract void close() throws IOException;
-
- /**
* Package protected for use by AbstractCQLSSTableWriter.
* Not meant to be exposed publicly.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 6993b19..427d2d4 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.io.sstable;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -71,7 +72,7 @@ import org.apache.cassandra.utils.Pair;
* writer.close();
* </pre>
*/
-public class CQLSSTableWriter
+public class CQLSSTableWriter implements Closeable
{
private final AbstractSSTableSimpleWriter writer;
private final UpdateStatement insert;