You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:37:38 UTC

svn commit: r901436 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: FileStreamTask.java IncomingTcpConnection.java MessagingService.java io/IncomingStreamReader.java

Author: jbellis
Date: Wed Jan 20 23:37:38 2010
New Revision: 901436

URL: http://svn.apache.org/viewvc?rev=901436&view=rev
Log:
implement streaming w/ blocking io
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java   (with props)
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=901436&r1=901435&r2=901436&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Wed Jan 20 23:37:38 2010
@@ -20,49 +20,89 @@
 
 import java.io.*;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
 
 import org.apache.log4j.Logger;
 
-class FileStreamTask implements Runnable
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class FileStreamTask extends WrappedRunnable
 {
-    private static Logger logger_ = Logger.getLogger( FileStreamTask.class );
-    
-    private String file_;
-    private long startPosition_;
-    private long total_;
-    private InetAddress from_;
-    private InetAddress to_;
+    private static Logger logger = Logger.getLogger( FileStreamTask.class );
     
-    FileStreamTask(String file, long startPosition, long total, InetAddress from, InetAddress to)
+    public static final int CHUNK_SIZE = 64*1024*1024;
+
+    private final String file;
+    private final long startPosition;
+    private final long endPosition;
+    private final InetAddress to;
+
+    FileStreamTask(String file, long startPosition, long endPosition, InetAddress from, InetAddress to)
     {
-        file_ = file;
-        startPosition_ = startPosition;
-        total_ = total;
-        from_ = from;
-        to_ = to;
+        this.file = file;
+        this.startPosition = startPosition;
+        this.endPosition = endPosition;
+        this.to = to;
     }
     
-    public void run()
+    public void runMayThrow() throws IOException
+    {
+        SocketChannel channel = SocketChannel.open(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
+        try
+        {
+            stream(channel);
+        }
+        finally
+        {
+            try
+            {
+                channel.close();
+            }
+            catch (IOException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("error closing socket", e);
+            }
+        }
+        if (logger.isDebugEnabled())
+          logger.debug("Done streaming " + file);
+    }
+
+    private void stream(SocketChannel channel) throws IOException
     {
-        /*
-        TODO
-        TcpConnection connection = null;
+        long start = startPosition;
+        RandomAccessFile raf = new RandomAccessFile(new File(file), "r");
         try
-        {                        
-            connection = new TcpConnection(from_, to_);
-            File file = new File(file_);             
-            connection.stream(file, startPosition_, total_);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Done streaming " + file);
+        {
+            FileChannel fc = raf.getChannel();
+
+            ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);
+            channel.write(buffer);
+            assert buffer.remaining() == 0;
+
+            while (start < endPosition)
+            {
+                long bytesTransferred = fc.transferTo(start, CHUNK_SIZE, channel);
+                if (logger.isDebugEnabled())
+                    logger.debug("Bytes transferred " + bytesTransferred);
+                start += bytesTransferred;
+            }
         }
-        catch (Exception e)
+        finally
         {
-            if (connection != null)
+            try
             {
-                connection.errorClose();
+                raf.close();
+            }
+            catch (IOException e)
+            {
+                throw new AssertionError(e);
             }
-            throw new RuntimeException(e);
         }
-         */
     }
+
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=901436&r1=901435&r2=901436&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan 20 23:37:38 2010
@@ -9,6 +9,9 @@
 
 import org.apache.log4j.Logger;
 
+import org.apache.cassandra.net.io.IncomingStreamReader;
+import org.apache.cassandra.utils.FBUtilities;
+
 public class IncomingTcpConnection extends Thread
 {
     private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
@@ -18,9 +21,11 @@
     private final byte[] headerBytes = new byte[4];
     private final byte[] sizeBytes = new byte[4];
     private final ByteBuffer sizeBuffer = ByteBuffer.wrap(sizeBytes).asReadOnlyBuffer();
+    private Socket socket;
 
     public IncomingTcpConnection(Socket socket)
     {
+        this.socket = socket;
         try
         {
             input = new DataInputStream(socket.getInputStream());
@@ -40,13 +45,27 @@
             {
                 input.readFully(protocolBytes);
                 MessagingService.validateProtocol(protocolBytes);
+
                 input.readFully(headerBytes);
-                input.readFully(sizeBytes);
-                int size = sizeBuffer.getInt();
-                sizeBuffer.clear();
-                byte[] contentBytes = new byte[size];
-                input.readFully(contentBytes);
-                MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+                int pH = FBUtilities.byteArrayToInt(headerBytes);
+                int type = MessagingService.getBits(pH, 1, 2);
+                boolean isStream = MessagingService.getBits(pH, 3, 1) == 1;
+                int version = MessagingService.getBits(pH, 15, 8);
+
+                if (isStream)
+                {
+                    new IncomingStreamReader(socket.getChannel()).read();
+                }
+                else
+                {
+                    input.readFully(sizeBytes);
+                    int size = sizeBuffer.getInt();
+                    sizeBuffer.clear();
+
+                    byte[] contentBytes = new byte[size];
+                    input.readFully(contentBytes);
+                    MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+                }
             }
             catch (IOException e)
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901436&r1=901435&r2=901436&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:37:38 2010
@@ -379,14 +379,14 @@
      * to not hold any of the contents of the file in memory.
      * @param file name of file to stream.
      * @param startPosition position inside the file
-     * @param total number of bytes to stream
+     * @param endPosition
      * @param to endpoint to which we need to stream the file.
     */
 
-    public void stream(String file, long startPosition, long total, InetAddress from, InetAddress to)
+    public void stream(String file, long startPosition, long endPosition, InetAddress from, InetAddress to)
     {
         /* Streaming asynchronously on streamExector_ threads. */
-        Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
+        Runnable streamingTask = new FileStreamTask(file, startPosition, endPosition, from, to);
         streamExecutor_.execute(streamingTask);
     }
 

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java?rev=901436&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java Wed Jan 20 23:37:38 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.net.InetSocketAddress;
+import java.net.InetAddress;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.io.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.FileStreamTask;
+
+public class IncomingStreamReader
+{
+    private static Logger logger = Logger.getLogger(IncomingStreamReader.class);
+    private StreamContextManager.StreamContext streamContext;
+    private StreamContextManager.StreamStatus streamStatus;
+    private SocketChannel socketChannel;
+
+    public IncomingStreamReader(SocketChannel socketChannel)
+    {
+        this.socketChannel = socketChannel;
+        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+        streamContext = StreamContextManager.getStreamContext(remoteAddress.getAddress());
+        assert streamContext != null;
+        streamStatus = StreamContextManager.getStreamStatus(remoteAddress.getAddress());
+        assert streamStatus != null;
+    }
+
+    public void read() throws IOException
+    {
+        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+        if (logger.isDebugEnabled())
+          logger.debug("Creating file for " + streamContext.getTargetFile());
+        FileOutputStream fos = new FileOutputStream(streamContext.getTargetFile(), true);
+        FileChannel fc = fos.getChannel();
+
+        long bytesRead = 0;
+        try
+        {
+            while (bytesRead < streamContext.getExpectedBytes())
+                bytesRead += fc.transferFrom(socketChannel, bytesRead, FileStreamTask.CHUNK_SIZE);
+        }
+        catch (IOException ex)
+        {
+            /* Ask the source node to re-stream this file. */
+            streamStatus.setAction(StreamContextManager.StreamCompletionAction.STREAM);
+            handleStreamCompletion(remoteAddress.getAddress());
+            /* Delete the orphaned file. */
+            File file = new File(streamContext.getTargetFile());
+            file.delete();
+            throw ex;
+        }
+
+        if (bytesRead == streamContext.getExpectedBytes())
+        {
+            if (logger.isDebugEnabled())
+            {
+                logger.debug("Removing stream context " + streamContext);
+            }
+            fc.close();
+            handleStreamCompletion(remoteAddress.getAddress());
+        }
+    }
+
+    private void handleStreamCompletion(InetAddress remoteHost) throws IOException
+    {
+        /*
+         * Streaming is complete. If all the data that has to be received inform the sender via
+         * the stream completion callback so that the source may perform the requisite cleanup.
+        */
+        IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
+        if (streamComplete != null)
+            streamComplete.onStreamCompletion(remoteHost, streamContext, streamStatus);
+    }
+}

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
------------------------------------------------------------------------------
    svn:eol-style = native