You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/11/30 17:38:38 UTC

svn commit: r1208499 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/hadoop/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/streaming/ src/java/or...

Author: brandonwilliams
Date: Wed Nov 30 16:38:36 2011
New Revision: 1208499

URL: http://svn.apache.org/viewvc?rev=1208499&view=rev
Log:
Bulk loader is no longer a fat client, hadoop bulk loader output format.
Patch by brandonwilliams, reviewed by Yuki Morishita for CASSANDRA-3045

Added:
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java   (with props)
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java   (with props)
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Nov 30 16:38:36 2011
@@ -9,6 +9,8 @@
    (CASSANDRA-3116)
  * recognize that "SELECT first ... *" isn't really "SELECT *" (CASSANDRA-3445)
  * Use faster bytes comparison (CASSANDRA-3434)
+ * Bulk loader is no longer a fat client, (HADOOP) bulk load output format
+   (CASSANDRA-3045)
 
 
 1.0.5

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Nov 30 16:38:36 2011
@@ -109,7 +109,7 @@ public class Config
     public RequestSchedulerId request_scheduler_id;
     public RequestSchedulerOptions request_scheduler_options;
 
-    public EncryptionOptions encryption_options;
+    public EncryptionOptions encryption_options = new EncryptionOptions();
 
     public Integer index_interval = 128;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Nov 30 16:38:36 2011
@@ -107,30 +107,38 @@ public class DatabaseDescriptor
 
         return url;
     }
+
+    public static void initDefaultsOnly()
+    {
+        conf = new Config();
+    }
     
     static
     {
         try
         {
-            URL url = getStorageConfigURL();
-            logger.info("Loading settings from " + url);
-
-            InputStream input = null;
-            try
-            {
-                input = url.openStream();
-            }
-            catch (IOException e)
+            // only load yaml if conf wasn't already set
+            if (conf == null)
             {
-                // getStorageConfigURL should have ruled this out
-                throw new AssertionError(e);
+                URL url = getStorageConfigURL();
+                logger.info("Loading settings from " + url);
+                InputStream input = null;
+                try
+                {
+                    input = url.openStream();
+                }
+                catch (IOException e)
+                {
+                    // getStorageConfigURL should have ruled this out
+                    throw new AssertionError(e);
+                }
+                org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
+                TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
+                seedDesc.putMapPropertyType("parameters", String.class, String.class);
+                constructor.addTypeDescription(seedDesc);
+                Yaml yaml = new Yaml(new Loader(constructor));
+                conf = (Config)yaml.load(input);
             }
-            org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
-            TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
-            seedDesc.putMapPropertyType("parameters", String.class, String.class);
-            constructor.addTypeDescription(seedDesc);
-            Yaml yaml = new Yaml(new Loader(constructor));
-            conf = (Config)yaml.load(input);
             
             if (conf.commitlog_sync == null)
             {

Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java?rev=1208499&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java Wed Nov 30 16:38:36 2011
@@ -0,0 +1,99 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+
+public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+    implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
+{
+    private static final Logger logger = LoggerFactory.getLogger(BulkOutputFormat.class);
+    
+    @Override
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(context.getConfiguration());
+    }
+
+    private void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
+        }
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+    
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
+    {
+        return new BulkRecordWriter(job);
+    }
+
+    @Override
+    public BulkRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new BulkRecordWriter(context);
+    }
+
+    public static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
+}

Propchange: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java?rev=1208499&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java Wed Nov 30 16:38:36 2011
@@ -0,0 +1,232 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.net.UnknownHostException;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
+import org.apache.cassandra.thrift.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.protocol.*;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+
+final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
+implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
+{
+    private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+    private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+    private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+    private final static String IS_SUPERCF = "mapreduce.output.bulkoutputformat.issuper";
+    private final Configuration conf;
+    private boolean isSuper = false;
+    private SSTableSimpleUnsortedWriter writer;
+    private SSTableLoader loader;
+
+    static {
+        DatabaseDescriptor.initDefaultsOnly(); // make sure DD doesn't load yaml
+    }
+
+    BulkRecordWriter(TaskAttemptContext context) throws IOException
+    {
+        this(context.getConfiguration());
+    }
+    
+    BulkRecordWriter(Configuration conf) throws IOException
+    {
+        this.conf = conf;
+        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.valueOf(conf.get(STREAM_THROTTLE_MBITS, "0")));
+        String keyspace = ConfigHelper.getOutputKeyspace(conf);
+        File outputdir = new File(getOutputLocation() + File.separator + keyspace); //dir must be named by ks for the loader
+        outputdir.mkdirs();
+        this.isSuper = Boolean.valueOf(conf.get(IS_SUPERCF));
+        AbstractType subcomparator = null;
+        if (isSuper)
+            subcomparator = BytesType.instance;
+        this.writer = new SSTableSimpleUnsortedWriter(
+                outputdir,
+                keyspace,
+                ConfigHelper.getOutputColumnFamily(conf),
+                BytesType.instance,
+                subcomparator,
+                Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")));
+        this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf)), new NullOutputHandler());
+    }
+
+    private String getOutputLocation() throws IOException
+    {
+        String dir = conf.get(OUTPUT_LOCATION, conf.get("mapred.local.dir"));
+        if (dir == null)
+            throw new IOException("Output directory not defined, if hadoop is not setting mapred.local.dir then define " + OUTPUT_LOCATION);
+        return dir;
+    }
+
+
+    @Override
+    public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
+    {
+        writer.newRow(keybuff);
+        for (Mutation mut : value)
+        {
+            if (isSuper)
+            {
+                writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
+                for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns)
+                   writer.addColumn(column.name, column.value, column.timestamp);
+            }
+            else
+                writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
+        }
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        close();
+    }
+
+    private void close() throws IOException
+    {
+        writer.close();
+        try
+        {
+            loader.stream().get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    static class ExternalClient extends SSTableLoader.Client
+    {
+        private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>();
+        private String hostlist;
+        private int rpcPort;
+
+        public ExternalClient(String hostlist, int port)
+        {
+            super();
+            this.hostlist = hostlist;
+            this.rpcPort = port;
+        }
+
+        public void init(String keyspace)
+        {
+            Set<InetAddress> hosts = new HashSet<InetAddress>();
+            String[] nodes = hostlist.split(",");
+            for (String node : nodes)
+            {
+                try
+                {
+                    hosts.add(InetAddress.getByName(node));
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+            Iterator<InetAddress> hostiter = hosts.iterator();
+            while (hostiter.hasNext())
+            {
+                try
+                {
+                    InetAddress host = hostiter.next();
+                    Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
+                    List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+                    List<KsDef> ksDefs = client.describe_keyspaces();
+
+                    setPartitioner(client.describe_partitioner());
+                    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
+
+                    for (TokenRange tr : tokenRanges)
+                    {
+                        Range range = new Range(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+                        for (String ep : tr.endpoints)
+                        {
+                            addRangeForEndpoint(range, InetAddress.getByName(ep));
+                        }
+                    }
+
+                    for (KsDef ksDef : ksDefs)
+                    {
+                        Set<String> cfs = new HashSet<String>();
+                        for (CfDef cfDef : ksDef.cf_defs)
+                            cfs.add(cfDef.name);
+                        knownCfs.put(ksDef.name, cfs);
+                    }
+                    break;
+                }
+                catch (Exception e)
+                {
+                    if (!hostiter.hasNext())
+                        throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
+                }
+            }
+        }
+
+        public boolean validateColumnFamily(String keyspace, String cfName)
+        {
+            Set<String> cfs = knownCfs.get(keyspace);
+            return cfs != null && cfs.contains(cfName);
+        }
+
+        private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException
+        {
+            TSocket socket = new TSocket(host, port);
+            TTransport trans = new TFramedTransport(socket);
+            trans.open();
+            TProtocol protocol = new org.apache.thrift.protocol.TBinaryProtocol(trans);
+            return new Cassandra.Client(protocol);
+        }
+    }
+
+    static class NullOutputHandler implements SSTableLoader.OutputHandler
+    {
+        public void output(String msg) {}
+
+        public void debug(String msg) {}
+    }
+}

Propchange: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java Wed Nov 30 16:38:36 2011
@@ -27,9 +27,11 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.*;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -84,7 +86,7 @@ public class SSTableLoader
 
                 try
                 {
-                    sstables.add(SSTableReader.open(desc, components, null, StorageService.getPartitioner()));
+                    sstables.add(SSTableReader.open(desc, components, null, client.getPartitioner()));
                 }
                 catch (IOException e)
                 {
@@ -227,15 +229,15 @@ public class SSTableLoader
     public static abstract class Client
     {
         private final Map<InetAddress, Collection<Range>> endpointToRanges = new HashMap<InetAddress, Collection<Range>>();
+        private IPartitioner partitioner;
 
         /**
          * Initialize the client.
          * Perform any step necessary so that after the call to the this
          * method:
-         *   * StorageService is correctly initialized (so that gossip and
-         *     messaging service is too)
+         *   * partitioner is initialized
          *   * getEndpointToRangesMap() returns a correct map
-         * This method is guaranted to be called before any other method of a
+         * This method is guaranteed to be called before any other method of a
          * client.
          */
         public abstract void init(String keyspace);
@@ -256,6 +258,16 @@ public class SSTableLoader
             return endpointToRanges;
         }
 
+        protected void setPartitioner(String partclass) throws ConfigurationException
+        {
+            this.partitioner = FBUtilities.newPartitioner(partclass);
+        }
+
+        public IPartitioner getPartitioner()
+        {
+            return partitioner;
+        }
+
         protected void addRangeForEndpoint(Range range, InetAddress endpoint)
         {
             Collection<Range> ranges = endpointToRanges.get(endpoint);

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Wed Nov 30 16:38:36 2011
@@ -40,7 +40,7 @@ public class Header
         serializer_ = new HeaderSerializer();        
     }
     
-    static IVersionedSerializer<Header> serializer()
+    public static IVersionedSerializer<Header> serializer()
     {
         return serializer_;
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Nov 30 16:38:36 2011
@@ -30,7 +30,7 @@ public class Message
     private final byte[] body_;
     private final transient int version;
 
-    Message(Header header, byte[] body, int version)
+    public Message(Header header, byte[] body, int version)
     {
         assert header != null;
         assert body != null;

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Nov 30 16:38:36 2011
@@ -162,7 +162,7 @@ public class OutboundTcpConnection exten
         }
     }
 
-    static void write(Message message, String id, DataOutputStream out) throws IOException
+    public static void write(Message message, String id, DataOutputStream out) throws IOException
     {
         /*
          Setting up the protocol header. This is 4 bytes long

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Wed Nov 30 16:38:36 2011
@@ -18,9 +18,7 @@
 
 package org.apache.cassandra.streaming;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
+import java.io.*;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
@@ -30,7 +28,11 @@ import org.apache.cassandra.gms.Gossiper
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.Header;
+import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.OutboundTcpConnection;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.Throttle;
@@ -54,12 +56,15 @@ public class FileStreamTask extends Wrap
 
     // communication socket
     private Socket socket;
-    // socket's output stream
+    // socket's output/input stream
     private OutputStream output;
+    private OutputStream compressedoutput;
+    private DataInputStream input;
     // allocate buffer to use for transfers only once
     private final byte[] transferBuffer = new byte[CHUNK_SIZE];
     // outbound global throughput limiter
     private final Throttle throttle;
+    private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
 
     public FileStreamTask(StreamHeader header, InetAddress to)
     {
@@ -89,6 +94,12 @@ public class FileStreamTask extends Wrap
             // successfully connected: stream.
             // (at this point, if we fail, it is the receiver's job to re-request)
             stream();
+            if (StreamOutSession.get(to, header.sessionId).getFiles().size() == 0)
+            {
+                // we are the last of our kind, receive the final confirmation before closing
+                receiveReply();
+                logger.info("Finished streaming session to {}", to);
+            }
         }
         finally
         {
@@ -125,7 +136,7 @@ public class FileStreamTask extends Wrap
                                 : RandomAccessReader.open(new File(header.file.getFilename()), true);
 
         // setting up data compression stream
-        output = new LZFOutputStream(output);
+        compressedoutput = new LZFOutputStream(output);
 
         try
         {
@@ -149,11 +160,13 @@ public class FileStreamTask extends Wrap
                 }
 
                 // make sure that current section is send
-                output.flush();
+                compressedoutput.flush();
 
                 if (logger.isDebugEnabled())
                     logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
             }
+            // receive reply confirmation
+            receiveReply();
         }
         finally
         {
@@ -162,6 +175,25 @@ public class FileStreamTask extends Wrap
         }
     }
 
+    private void receiveReply() throws IOException
+    {
+        MessagingService.validateMagic(input.readInt());
+        int msheader = input.readInt();
+        assert MessagingService.getBits(msheader, 3, 1) == 0 : "Stream received before stream reply";
+        int version = MessagingService.getBits(msheader, 15, 8);
+
+        int totalSize = input.readInt();
+        String id = input.readUTF();
+        Header header = Header.serializer().deserialize(input, version);
+
+        int bodySize = input.readInt();
+        byte[] body = new byte[bodySize];
+        input.readFully(body);
+        Message message = new Message(header, body, version);
+        assert message.getVerb() == StorageService.Verb.STREAM_REPLY : "Non-reply message received on stream socket";
+        handler.doVerb(message, id);
+    }
+
     /**
      * Sequentially read bytes from the file and write them to the output stream
      *
@@ -178,7 +210,7 @@ public class FileStreamTask extends Wrap
         int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
 
         reader.readFully(transferBuffer, 0, toTransfer);
-        output.write(transferBuffer, 0, toTransfer);
+        compressedoutput.write(transferBuffer, 0, toTransfer);
         throttle.throttleDelta(toTransfer);
 
         return toTransfer;
@@ -198,6 +230,7 @@ public class FileStreamTask extends Wrap
             {
                 socket = MessagingService.instance().getConnectionPool(to).newSocket();
                 output = socket.getOutputStream();
+                input = new DataInputStream(socket.getInputStream());
                 break;
             }
             catch (IOException e)

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Nov 30 16:38:36 2011
@@ -60,6 +60,8 @@ public class IncomingStreamReader
         InetAddress host = header.broadcastAddress != null ? header.broadcastAddress
                            : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
         session = StreamInSession.get(host, header.sessionId);
+        session.setSocket(socket);
+
         session.addFiles(header.pendingFiles);
         // set the current file we are streaming so progress shows up in jmx
         session.setCurrentFile(header.file);
@@ -88,18 +90,13 @@ public class IncomingStreamReader
             try
             {
                 reader = streamIn(dis, localFile, remoteFile);
+                session.finished(remoteFile, reader);
             }
             catch (IOException ex)
             {
                 retry();
                 throw ex;
             }
-            finally
-            {
-                dis.close();
-            }
-
-            session.finished(remoteFile, reader);
         }
 
         session.closeIfFinished();
@@ -169,6 +166,7 @@ public class IncomingStreamReader
         session.retry(remoteFile);
 
         /* Delete the orphaned file. */
-        FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
+        if (new File(localFile.getFilename()).isFile())
+            FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Nov 30 16:38:36 2011
@@ -18,8 +18,10 @@
 
 package org.apache.cassandra.streaming;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
@@ -27,7 +29,8 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.OutboundTcpConnection;
 import org.apache.cassandra.utils.Pair;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -49,6 +52,7 @@ public class StreamInSession
     private String table;
     private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
     private PendingFile current;
+    private Socket socket;
 
     private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
     {
@@ -89,6 +93,11 @@ public class StreamInSession
         this.table = table;
     }
 
+    public void setSocket(Socket socket)
+    {
+        this.socket = socket;
+    }
+
     public void addFiles(Collection<PendingFile> files)
     {
         for (PendingFile file : files)
@@ -111,16 +120,23 @@ public class StreamInSession
             current = null;
         StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_FINISHED);
         // send a StreamStatus message telling the source node it can delete this file
-        MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
+        sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
+        logger.debug("ack {} sent for {}", reply, remoteFile);
     }
 
     public void retry(PendingFile remoteFile) throws IOException
     {
         StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
         logger.info("Streaming of file {} from {} failed: requesting a retry.", remoteFile, this);
-        MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
+        sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
     }
 
+    public void sendMessage(Message message) throws IOException
+    {
+        OutboundTcpConnection.write(message, String.valueOf(getSessionId()), new DataOutputStream(socket.getOutputStream()));
+    }
+
+
     public void closeIfFinished() throws IOException
     {
         if (files.isEmpty())
@@ -160,7 +176,14 @@ public class StreamInSession
             // send reply to source that we're done
             StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);
             logger.info("Finished streaming session {} from {}", getSessionId(), getHost());
-            MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
+            try
+            {
+                OutboundTcpConnection.write(reply.getMessage(Gossiper.instance.getVersion(getHost())), context.right.toString(), new DataOutputStream(socket.getOutputStream()));
+            }
+            finally
+            {
+                socket.close();
+            }
 
             if (callback != null)
                 callback.run();

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java Wed Nov 30 16:38:36 2011
@@ -55,6 +55,7 @@ public class StreamReplyVerbHandler impl
             switch (reply.action)
             {
                 case FILE_FINISHED:
+                    logger.info("Successfully sent {} to {}", reply.file, message.getFrom());
                     session.validateCurrentFile(reply.file);
                     session.startNext();
                     break;

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java?rev=1208499&r1=1208498&r2=1208499&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java Wed Nov 30 16:38:36 2011
@@ -28,9 +28,7 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.PendingFile;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
@@ -52,13 +50,15 @@ public class BulkLoader
     private static final String HELP_OPTION  = "help";
     private static final String NOPROGRESS_OPTION  = "no-progress";
     private static final String IGNORE_NODES_OPTION  = "ignore";
+    private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes";
+    private static final String RPC_PORT_OPTION = "port";
 
     public static void main(String args[]) throws IOException
     {
         LoaderOptions options = LoaderOptions.parseArgs(args);
         try
         {
-            SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options), options);
+            SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options, options.hosts, options.rpcPort), options);
             SSTableLoader.LoaderFuture future = loader.stream(options.ignores);
 
             if (options.noProgress)
@@ -171,38 +171,34 @@ public class BulkLoader
     {
         private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>();
         private final SSTableLoader.OutputHandler outputHandler;
+        private Set<InetAddress> hosts = new HashSet<InetAddress>();
+        private int rpcPort;
 
-        public ExternalClient(SSTableLoader.OutputHandler outputHandler)
+        public ExternalClient(SSTableLoader.OutputHandler outputHandler, Set<InetAddress> hosts, int port)
         {
             super();
             this.outputHandler = outputHandler;
+            this.hosts = hosts;
+            this.rpcPort = port;
         }
 
         public void init(String keyspace)
         {
-            outputHandler.output(String.format("Starting client (and waiting %d seconds for gossip) ...", StorageService.RING_DELAY / 1000));
-            try
+            Iterator<InetAddress> hostiter = hosts.iterator();
+            while (hostiter.hasNext())
             {
-                // Init gossip
-                StorageService.instance.initClient();
+                try
+                {
 
-                Set<InetAddress> hosts = Gossiper.instance.getLiveMembers();
-                hosts.remove(FBUtilities.getBroadcastAddress());
-                if (hosts.isEmpty())
-                    throw new IllegalStateException("Cannot load any sstable, no live member found in the cluster");
-
-                // Query endpoint to ranges map and schemas from thrift
-                String host = hosts.iterator().next().toString().substring(1);
-                int port = DatabaseDescriptor.getRpcPort();
-
-                Cassandra.Client client = createThriftClient(host, port);
-                List<TokenRange> tokenRanges = client.describe_ring(keyspace);
-                List<KsDef> ksDefs = client.describe_keyspaces();
+                    // Query endpoint to ranges map and schemas from thrift
+                    InetAddress host = hostiter.next();
+                    Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
+                    List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+                    List<KsDef> ksDefs = client.describe_keyspaces();
 
-                Token.TokenFactory tkFactory = StorageService.getPartitioner().getTokenFactory();
+                    setPartitioner(client.describe_partitioner());
+                    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
 
-                try
-                {
                     for (TokenRange tr : tokenRanges)
                     {
                         Range range = new Range(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
@@ -211,30 +207,22 @@ public class BulkLoader
                             addRangeForEndpoint(range, InetAddress.getByName(ep));
                         }
                     }
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException("Got an unknow host from describe_ring()", e);
-                }
 
-                for (KsDef ksDef : ksDefs)
+                    for (KsDef ksDef : ksDefs)
+                    {
+                        Set<String> cfs = new HashSet<String>();
+                        for (CfDef cfDef : ksDef.cf_defs)
+                            cfs.add(cfDef.name);
+                        knownCfs.put(ksDef.name, cfs);
+                    }
+                    break;
+                }
+                catch (Exception e)
                 {
-                    Set<String> cfs = new HashSet<String>();
-                    for (CfDef cfDef : ksDef.cf_defs)
-                        cfs.add(cfDef.name);
-                    knownCfs.put(ksDef.name, cfs);
+                    if (!hostiter.hasNext())
+                        throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
                 }
             }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public void stop()
-        {
-            StorageService.instance.stopClient();
         }
 
         public boolean validateColumnFamily(String keyspace, String cfName)
@@ -260,7 +248,9 @@ public class BulkLoader
         public boolean debug;
         public boolean verbose;
         public boolean noProgress;
+        public int rpcPort = 9160;
 
+        public Set<InetAddress> hosts = new HashSet<InetAddress>();
         public Set<InetAddress> ignores = new HashSet<InetAddress>();
 
         LoaderOptions(File directory)
@@ -312,6 +302,32 @@ public class BulkLoader
                 opts.verbose = cmd.hasOption(VERBOSE_OPTION);
                 opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
 
+                if (cmd.hasOption(RPC_PORT_OPTION))
+                    opts.rpcPort = Integer.valueOf(cmd.getOptionValue(RPC_PORT_OPTION));
+
+                if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION))
+                {
+                    String[] nodes = cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(",");
+                    try
+                    {
+                        for (String node : nodes)
+                        {
+                            opts.hosts.add(InetAddress.getByName(node.trim()));
+                        }
+                    }
+                    catch (UnknownHostException e)
+                    {
+                        errorMsg("Unknown host: " + e.getMessage(), options);
+                    }
+
+                }
+                else
+                {
+                    System.err.println("Initial hosts must be specified (-d)");
+                    printUsage(options);
+                    System.exit(1);
+                }
+
                 if (cmd.hasOption(IGNORE_NODES_OPTION))
                 {
                     String[] nodes = cmd.getOptionValue(IGNORE_NODES_OPTION).split(",");
@@ -363,6 +379,8 @@ public class BulkLoader
             options.addOption("h",  HELP_OPTION,         "display this help message");
             options.addOption(null, NOPROGRESS_OPTION,   "don't display progress");
             options.addOption("i",  IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes");
+            options.addOption("d",  INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "try to connect to these hosts (comma separated) initially for ring information");
+            options.addOption("p",  RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)");
             return options;
         }