You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/11/30 17:38:38 UTC
svn commit: r1208499 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/hadoop/
src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/streaming/
src/java/or...
Author: brandonwilliams
Date: Wed Nov 30 16:38:36 2011
New Revision: 1208499
URL: http://svn.apache.org/viewvc?rev=1208499&view=rev
Log:
Bulk loader is no longer a fat client, hadoop bulk loader output format.
Patch by brandonwilliams, reviewed by Yuki Morishita for CASSANDRA-3045
Added:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java (with props)
cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java (with props)
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Nov 30 16:38:36 2011
@@ -9,6 +9,8 @@
(CASSANDRA-3116)
* recognize that "SELECT first ... *" isn't really "SELECT *" (CASSANDRA-3445)
* Use faster bytes comparison (CASSANDRA-3434)
+ * Bulk loader is no longer a fat client, (HADOOP) bulk load output format
+ (CASSANDRA-3045)
1.0.5
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Nov 30 16:38:36 2011
@@ -109,7 +109,7 @@ public class Config
public RequestSchedulerId request_scheduler_id;
public RequestSchedulerOptions request_scheduler_options;
- public EncryptionOptions encryption_options;
+ public EncryptionOptions encryption_options = new EncryptionOptions();
public Integer index_interval = 128;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Nov 30 16:38:36 2011
@@ -107,30 +107,38 @@ public class DatabaseDescriptor
return url;
}
+
+ public static void initDefaultsOnly()
+ {
+ conf = new Config();
+ }
static
{
try
{
- URL url = getStorageConfigURL();
- logger.info("Loading settings from " + url);
-
- InputStream input = null;
- try
- {
- input = url.openStream();
- }
- catch (IOException e)
+ // only load yaml if conf wasn't already set
+ if (conf == null)
{
- // getStorageConfigURL should have ruled this out
- throw new AssertionError(e);
+ URL url = getStorageConfigURL();
+ logger.info("Loading settings from " + url);
+ InputStream input = null;
+ try
+ {
+ input = url.openStream();
+ }
+ catch (IOException e)
+ {
+ // getStorageConfigURL should have ruled this out
+ throw new AssertionError(e);
+ }
+ org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
+ TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
+ seedDesc.putMapPropertyType("parameters", String.class, String.class);
+ constructor.addTypeDescription(seedDesc);
+ Yaml yaml = new Yaml(new Loader(constructor));
+ conf = (Config)yaml.load(input);
}
- org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
- TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
- seedDesc.putMapPropertyType("parameters", String.class, String.class);
- constructor.addTypeDescription(seedDesc);
- Yaml yaml = new Yaml(new Loader(constructor));
- conf = (Config)yaml.load(input);
if (conf.commitlog_sync == null)
{
Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java?rev=1208499&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java Wed Nov 30 16:38:36 2011
@@ -0,0 +1,99 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+
+public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+ implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
+{
+ private static final Logger logger = LoggerFactory.getLogger(BulkOutputFormat.class);
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ {
+ checkOutputSpecs(context.getConfiguration());
+ }
+
+ private void checkOutputSpecs(Configuration conf)
+ {
+ if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null)
+ {
+ throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
+ }
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+ {
+ checkOutputSpecs(job);
+ }
+
+ /** Fills the deprecated OutputFormat interface for streaming. */
+ @Deprecated
+ public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
+ {
+ return new BulkRecordWriter(job);
+ }
+
+ @Override
+ public BulkRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new BulkRecordWriter(context);
+ }
+
+ public static class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
+}
Propchange: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java?rev=1208499&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java Wed Nov 30 16:38:36 2011
@@ -0,0 +1,232 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.net.UnknownHostException;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
+import org.apache.cassandra.thrift.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.protocol.*;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+
+final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
+implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
+{
+ private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+ private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+ private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+ private final static String IS_SUPERCF = "mapreduce.output.bulkoutputformat.issuper";
+ private final Configuration conf;
+ private boolean isSuper = false;
+ private SSTableSimpleUnsortedWriter writer;
+ private SSTableLoader loader;
+
+ static {
+ DatabaseDescriptor.initDefaultsOnly(); // make sure DD doesn't load yaml
+ }
+
+ BulkRecordWriter(TaskAttemptContext context) throws IOException
+ {
+ this(context.getConfiguration());
+ }
+
+ BulkRecordWriter(Configuration conf) throws IOException
+ {
+ this.conf = conf;
+ DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.valueOf(conf.get(STREAM_THROTTLE_MBITS, "0")));
+ String keyspace = ConfigHelper.getOutputKeyspace(conf);
+ File outputdir = new File(getOutputLocation() + File.separator + keyspace); //dir must be named by ks for the loader
+ outputdir.mkdirs();
+ this.isSuper = Boolean.valueOf(conf.get(IS_SUPERCF));
+ AbstractType subcomparator = null;
+ if (isSuper)
+ subcomparator = BytesType.instance;
+ this.writer = new SSTableSimpleUnsortedWriter(
+ outputdir,
+ keyspace,
+ ConfigHelper.getOutputColumnFamily(conf),
+ BytesType.instance,
+ subcomparator,
+ Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")));
+ this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf)), new NullOutputHandler());
+ }
+
+ private String getOutputLocation() throws IOException
+ {
+ String dir = conf.get(OUTPUT_LOCATION, conf.get("mapred.local.dir"));
+ if (dir == null)
+ throw new IOException("Output directory not defined, if hadoop is not setting mapred.local.dir then define " + OUTPUT_LOCATION);
+ return dir;
+ }
+
+
+ @Override
+ public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
+ {
+ writer.newRow(keybuff);
+ for (Mutation mut : value)
+ {
+ if (isSuper)
+ {
+ writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
+ for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns)
+ writer.addColumn(column.name, column.value, column.timestamp);
+ }
+ else
+ writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ close();
+ }
+
+ /** Fills the deprecated RecordWriter interface for streaming. */
+ @Deprecated
+ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+ {
+ close();
+ }
+
+ private void close() throws IOException
+ {
+ writer.close();
+ try
+ {
+ loader.stream().get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ static class ExternalClient extends SSTableLoader.Client
+ {
+ private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>();
+ private String hostlist;
+ private int rpcPort;
+
+ public ExternalClient(String hostlist, int port)
+ {
+ super();
+ this.hostlist = hostlist;
+ this.rpcPort = port;
+ }
+
+ public void init(String keyspace)
+ {
+ Set<InetAddress> hosts = new HashSet<InetAddress>();
+ String[] nodes = hostlist.split(",");
+ for (String node : nodes)
+ {
+ try
+ {
+ hosts.add(InetAddress.getByName(node));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ Iterator<InetAddress> hostiter = hosts.iterator();
+ while (hostiter.hasNext())
+ {
+ try
+ {
+ InetAddress host = hostiter.next();
+ Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
+ List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+ List<KsDef> ksDefs = client.describe_keyspaces();
+
+ setPartitioner(client.describe_partitioner());
+ Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
+
+ for (TokenRange tr : tokenRanges)
+ {
+ Range range = new Range(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+ for (String ep : tr.endpoints)
+ {
+ addRangeForEndpoint(range, InetAddress.getByName(ep));
+ }
+ }
+
+ for (KsDef ksDef : ksDefs)
+ {
+ Set<String> cfs = new HashSet<String>();
+ for (CfDef cfDef : ksDef.cf_defs)
+ cfs.add(cfDef.name);
+ knownCfs.put(ksDef.name, cfs);
+ }
+ break;
+ }
+ catch (Exception e)
+ {
+ if (!hostiter.hasNext())
+ throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
+ }
+ }
+ }
+
+ public boolean validateColumnFamily(String keyspace, String cfName)
+ {
+ Set<String> cfs = knownCfs.get(keyspace);
+ return cfs != null && cfs.contains(cfName);
+ }
+
+ private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException
+ {
+ TSocket socket = new TSocket(host, port);
+ TTransport trans = new TFramedTransport(socket);
+ trans.open();
+ TProtocol protocol = new org.apache.thrift.protocol.TBinaryProtocol(trans);
+ return new Cassandra.Client(protocol);
+ }
+ }
+
+ static class NullOutputHandler implements SSTableLoader.OutputHandler
+ {
+ public void output(String msg) {}
+
+ public void debug(String msg) {}
+ }
+}
Propchange: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java Wed Nov 30 16:38:36 2011
@@ -27,9 +27,11 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.*;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
/**
@@ -84,7 +86,7 @@ public class SSTableLoader
try
{
- sstables.add(SSTableReader.open(desc, components, null, StorageService.getPartitioner()));
+ sstables.add(SSTableReader.open(desc, components, null, client.getPartitioner()));
}
catch (IOException e)
{
@@ -227,15 +229,15 @@ public class SSTableLoader
public static abstract class Client
{
private final Map<InetAddress, Collection<Range>> endpointToRanges = new HashMap<InetAddress, Collection<Range>>();
+ private IPartitioner partitioner;
/**
* Initialize the client.
* Perform any step necessary so that after the call to the this
* method:
- * * StorageService is correctly initialized (so that gossip and
- * messaging service is too)
+ * * partitioner is initialized
* * getEndpointToRangesMap() returns a correct map
- * This method is guaranted to be called before any other method of a
+ * This method is guaranteed to be called before any other method of a
* client.
*/
public abstract void init(String keyspace);
@@ -256,6 +258,16 @@ public class SSTableLoader
return endpointToRanges;
}
+ protected void setPartitioner(String partclass) throws ConfigurationException
+ {
+ this.partitioner = FBUtilities.newPartitioner(partclass);
+ }
+
+ public IPartitioner getPartitioner()
+ {
+ return partitioner;
+ }
+
protected void addRangeForEndpoint(Range range, InetAddress endpoint)
{
Collection<Range> ranges = endpointToRanges.get(endpoint);
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Wed Nov 30 16:38:36 2011
@@ -40,7 +40,7 @@ public class Header
serializer_ = new HeaderSerializer();
}
- static IVersionedSerializer<Header> serializer()
+ public static IVersionedSerializer<Header> serializer()
{
return serializer_;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Nov 30 16:38:36 2011
@@ -30,7 +30,7 @@ public class Message
private final byte[] body_;
private final transient int version;
- Message(Header header, byte[] body, int version)
+ public Message(Header header, byte[] body, int version)
{
assert header != null;
assert body != null;
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Nov 30 16:38:36 2011
@@ -162,7 +162,7 @@ public class OutboundTcpConnection exten
}
}
- static void write(Message message, String id, DataOutputStream out) throws IOException
+ public static void write(Message message, String id, DataOutputStream out) throws IOException
{
/*
Setting up the protocol header. This is 4 bytes long
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Wed Nov 30 16:38:36 2011
@@ -18,9 +18,7 @@
package org.apache.cassandra.streaming;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
+import java.io.*;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
@@ -30,7 +28,11 @@ import org.apache.cassandra.gms.Gossiper
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.Header;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.OutboundTcpConnection;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throttle;
@@ -54,12 +56,15 @@ public class FileStreamTask extends Wrap
// communication socket
private Socket socket;
- // socket's output stream
+ // socket's output/input stream
private OutputStream output;
+ private OutputStream compressedoutput;
+ private DataInputStream input;
// allocate buffer to use for transfers only once
private final byte[] transferBuffer = new byte[CHUNK_SIZE];
// outbound global throughput limiter
private final Throttle throttle;
+ private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
public FileStreamTask(StreamHeader header, InetAddress to)
{
@@ -89,6 +94,12 @@ public class FileStreamTask extends Wrap
// successfully connected: stream.
// (at this point, if we fail, it is the receiver's job to re-request)
stream();
+ if (StreamOutSession.get(to, header.sessionId).getFiles().size() == 0)
+ {
+ // we are the last of our kind, receive the final confirmation before closing
+ receiveReply();
+ logger.info("Finished streaming session to {}", to);
+ }
}
finally
{
@@ -125,7 +136,7 @@ public class FileStreamTask extends Wrap
: RandomAccessReader.open(new File(header.file.getFilename()), true);
// setting up data compression stream
- output = new LZFOutputStream(output);
+ compressedoutput = new LZFOutputStream(output);
try
{
@@ -149,11 +160,13 @@ public class FileStreamTask extends Wrap
}
// make sure that current section is send
- output.flush();
+ compressedoutput.flush();
if (logger.isDebugEnabled())
logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
}
+ // receive reply confirmation
+ receiveReply();
}
finally
{
@@ -162,6 +175,25 @@ public class FileStreamTask extends Wrap
}
}
+ private void receiveReply() throws IOException
+ {
+ MessagingService.validateMagic(input.readInt());
+ int msheader = input.readInt();
+ assert MessagingService.getBits(msheader, 3, 1) == 0 : "Stream received before stream reply";
+ int version = MessagingService.getBits(msheader, 15, 8);
+
+ int totalSize = input.readInt();
+ String id = input.readUTF();
+ Header header = Header.serializer().deserialize(input, version);
+
+ int bodySize = input.readInt();
+ byte[] body = new byte[bodySize];
+ input.readFully(body);
+ Message message = new Message(header, body, version);
+ assert message.getVerb() == StorageService.Verb.STREAM_REPLY : "Non-reply message received on stream socket";
+ handler.doVerb(message, id);
+ }
+
/**
* Sequentially read bytes from the file and write them to the output stream
*
@@ -178,7 +210,7 @@ public class FileStreamTask extends Wrap
int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
reader.readFully(transferBuffer, 0, toTransfer);
- output.write(transferBuffer, 0, toTransfer);
+ compressedoutput.write(transferBuffer, 0, toTransfer);
throttle.throttleDelta(toTransfer);
return toTransfer;
@@ -198,6 +230,7 @@ public class FileStreamTask extends Wrap
{
socket = MessagingService.instance().getConnectionPool(to).newSocket();
output = socket.getOutputStream();
+ input = new DataInputStream(socket.getInputStream());
break;
}
catch (IOException e)
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Nov 30 16:38:36 2011
@@ -60,6 +60,8 @@ public class IncomingStreamReader
InetAddress host = header.broadcastAddress != null ? header.broadcastAddress
: ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
session = StreamInSession.get(host, header.sessionId);
+ session.setSocket(socket);
+
session.addFiles(header.pendingFiles);
// set the current file we are streaming so progress shows up in jmx
session.setCurrentFile(header.file);
@@ -88,18 +90,13 @@ public class IncomingStreamReader
try
{
reader = streamIn(dis, localFile, remoteFile);
+ session.finished(remoteFile, reader);
}
catch (IOException ex)
{
retry();
throw ex;
}
- finally
- {
- dis.close();
- }
-
- session.finished(remoteFile, reader);
}
session.closeIfFinished();
@@ -169,6 +166,7 @@ public class IncomingStreamReader
session.retry(remoteFile);
/* Delete the orphaned file. */
- FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
+ if (new File(localFile.getFilename()).isFile())
+ FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Nov 30 16:38:36 2011
@@ -18,8 +18,10 @@
package org.apache.cassandra.streaming;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.Socket;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
@@ -27,7 +29,8 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.db.Table;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.OutboundTcpConnection;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -49,6 +52,7 @@ public class StreamInSession
private String table;
private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
private PendingFile current;
+ private Socket socket;
private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
{
@@ -89,6 +93,11 @@ public class StreamInSession
this.table = table;
}
+ public void setSocket(Socket socket)
+ {
+ this.socket = socket;
+ }
+
public void addFiles(Collection<PendingFile> files)
{
for (PendingFile file : files)
@@ -111,16 +120,23 @@ public class StreamInSession
current = null;
StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_FINISHED);
// send a StreamStatus message telling the source node it can delete this file
- MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
+ sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
+ logger.debug("ack {} sent for {}", reply, remoteFile);
}
public void retry(PendingFile remoteFile) throws IOException
{
StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
logger.info("Streaming of file {} from {} failed: requesting a retry.", remoteFile, this);
- MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
+ sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
}
+ public void sendMessage(Message message) throws IOException
+ {
+ OutboundTcpConnection.write(message, String.valueOf(getSessionId()), new DataOutputStream(socket.getOutputStream()));
+ }
+
+
public void closeIfFinished() throws IOException
{
if (files.isEmpty())
@@ -160,7 +176,14 @@ public class StreamInSession
// send reply to source that we're done
StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);
logger.info("Finished streaming session {} from {}", getSessionId(), getHost());
- MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
+ try
+ {
+ OutboundTcpConnection.write(reply.getMessage(Gossiper.instance.getVersion(getHost())), context.right.toString(), new DataOutputStream(socket.getOutputStream()));
+ }
+ finally
+ {
+ socket.close();
+ }
if (callback != null)
callback.run();
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java Wed Nov 30 16:38:36 2011
@@ -55,6 +55,7 @@ public class StreamReplyVerbHandler impl
switch (reply.action)
{
case FILE_FINISHED:
+ logger.info("Successfully sent {} to {}", reply.file, message.getFrom());
session.validateCurrentFile(reply.file);
session.startNext();
break;
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java Wed Nov 30 16:38:36 2011
@@ -28,9 +28,7 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.PendingFile;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.FBUtilities;
@@ -52,13 +50,15 @@ public class BulkLoader
private static final String HELP_OPTION = "help";
private static final String NOPROGRESS_OPTION = "no-progress";
private static final String IGNORE_NODES_OPTION = "ignore";
+ private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes";
+ private static final String RPC_PORT_OPTION = "port";
public static void main(String args[]) throws IOException
{
LoaderOptions options = LoaderOptions.parseArgs(args);
try
{
- SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options), options);
+ SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options, options.hosts, options.rpcPort), options);
SSTableLoader.LoaderFuture future = loader.stream(options.ignores);
if (options.noProgress)
@@ -171,38 +171,34 @@ public class BulkLoader
{
private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>();
private final SSTableLoader.OutputHandler outputHandler;
+ private Set<InetAddress> hosts = new HashSet<InetAddress>();
+ private int rpcPort;
- public ExternalClient(SSTableLoader.OutputHandler outputHandler)
+ public ExternalClient(SSTableLoader.OutputHandler outputHandler, Set<InetAddress> hosts, int port)
{
super();
this.outputHandler = outputHandler;
+ this.hosts = hosts;
+ this.rpcPort = port;
}
public void init(String keyspace)
{
- outputHandler.output(String.format("Starting client (and waiting %d seconds for gossip) ...", StorageService.RING_DELAY / 1000));
- try
+ Iterator<InetAddress> hostiter = hosts.iterator();
+ while (hostiter.hasNext())
{
- // Init gossip
- StorageService.instance.initClient();
+ try
+ {
- Set<InetAddress> hosts = Gossiper.instance.getLiveMembers();
- hosts.remove(FBUtilities.getBroadcastAddress());
- if (hosts.isEmpty())
- throw new IllegalStateException("Cannot load any sstable, no live member found in the cluster");
-
- // Query endpoint to ranges map and schemas from thrift
- String host = hosts.iterator().next().toString().substring(1);
- int port = DatabaseDescriptor.getRpcPort();
-
- Cassandra.Client client = createThriftClient(host, port);
- List<TokenRange> tokenRanges = client.describe_ring(keyspace);
- List<KsDef> ksDefs = client.describe_keyspaces();
+ // Query endpoint to ranges map and schemas from thrift
+ InetAddress host = hostiter.next();
+ Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
+ List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+ List<KsDef> ksDefs = client.describe_keyspaces();
- Token.TokenFactory tkFactory = StorageService.getPartitioner().getTokenFactory();
+ setPartitioner(client.describe_partitioner());
+ Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
- try
- {
for (TokenRange tr : tokenRanges)
{
Range range = new Range(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
@@ -211,30 +207,22 @@ public class BulkLoader
addRangeForEndpoint(range, InetAddress.getByName(ep));
}
}
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException("Got an unknow host from describe_ring()", e);
- }
- for (KsDef ksDef : ksDefs)
+ for (KsDef ksDef : ksDefs)
+ {
+ Set<String> cfs = new HashSet<String>();
+ for (CfDef cfDef : ksDef.cf_defs)
+ cfs.add(cfDef.name);
+ knownCfs.put(ksDef.name, cfs);
+ }
+ break;
+ }
+ catch (Exception e)
{
- Set<String> cfs = new HashSet<String>();
- for (CfDef cfDef : ksDef.cf_defs)
- cfs.add(cfDef.name);
- knownCfs.put(ksDef.name, cfs);
+ if (!hostiter.hasNext())
+ throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
}
}
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void stop()
- {
- StorageService.instance.stopClient();
}
public boolean validateColumnFamily(String keyspace, String cfName)
@@ -260,7 +248,9 @@ public class BulkLoader
public boolean debug;
public boolean verbose;
public boolean noProgress;
+ public int rpcPort = 9160;
+ public Set<InetAddress> hosts = new HashSet<InetAddress>();
public Set<InetAddress> ignores = new HashSet<InetAddress>();
LoaderOptions(File directory)
@@ -312,6 +302,32 @@ public class BulkLoader
opts.verbose = cmd.hasOption(VERBOSE_OPTION);
opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
+ if (cmd.hasOption(RPC_PORT_OPTION))
+ opts.rpcPort = Integer.valueOf(cmd.getOptionValue(RPC_PORT_OPTION));
+
+ if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION))
+ {
+ String[] nodes = cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(",");
+ try
+ {
+ for (String node : nodes)
+ {
+ opts.hosts.add(InetAddress.getByName(node.trim()));
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ errorMsg("Unknown host: " + e.getMessage(), options);
+ }
+
+ }
+ else
+ {
+ System.err.println("Initial hosts must be specified (-d)");
+ printUsage(options);
+ System.exit(1);
+ }
+
if (cmd.hasOption(IGNORE_NODES_OPTION))
{
String[] nodes = cmd.getOptionValue(IGNORE_NODES_OPTION).split(",");
@@ -363,6 +379,8 @@ public class BulkLoader
options.addOption("h", HELP_OPTION, "display this help message");
options.addOption(null, NOPROGRESS_OPTION, "don't display progress");
options.addOption("i", IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes");
+ options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "try to connect to these hosts (comma separated) initially for ring information");
+ options.addOption("p", RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)");
return options;
}