You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:37:38 UTC
svn commit: r901436 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/net:
FileStreamTask.java IncomingTcpConnection.java MessagingService.java
io/IncomingStreamReader.java
Author: jbellis
Date: Wed Jan 20 23:37:38 2010
New Revision: 901436
URL: http://svn.apache.org/viewvc?rev=901436&view=rev
Log:
implement streaming w/ blocking io
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java (with props)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=901436&r1=901435&r2=901436&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Wed Jan 20 23:37:38 2010
@@ -20,49 +20,89 @@
import java.io.*;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
import org.apache.log4j.Logger;
-class FileStreamTask implements Runnable
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class FileStreamTask extends WrappedRunnable
{
- private static Logger logger_ = Logger.getLogger( FileStreamTask.class );
-
- private String file_;
- private long startPosition_;
- private long total_;
- private InetAddress from_;
- private InetAddress to_;
+ private static Logger logger = Logger.getLogger( FileStreamTask.class );
- FileStreamTask(String file, long startPosition, long total, InetAddress from, InetAddress to)
+ public static final int CHUNK_SIZE = 64*1024*1024;
+
+ private final String file;
+ private final long startPosition;
+ private final long endPosition;
+ private final InetAddress to;
+
+ FileStreamTask(String file, long startPosition, long endPosition, InetAddress from, InetAddress to)
{
- file_ = file;
- startPosition_ = startPosition;
- total_ = total;
- from_ = from;
- to_ = to;
+ this.file = file;
+ this.startPosition = startPosition;
+ this.endPosition = endPosition;
+ this.to = to;
}
- public void run()
+ public void runMayThrow() throws IOException
+ {
+ SocketChannel channel = SocketChannel.open(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
+ try
+ {
+ stream(channel);
+ }
+ finally
+ {
+ try
+ {
+ channel.close();
+ }
+ catch (IOException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("error closing socket", e);
+ }
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("Done streaming " + file);
+ }
+
+ private void stream(SocketChannel channel) throws IOException
{
- /*
- TODO
- TcpConnection connection = null;
+ long start = startPosition;
+ RandomAccessFile raf = new RandomAccessFile(new File(file), "r");
try
- {
- connection = new TcpConnection(from_, to_);
- File file = new File(file_);
- connection.stream(file, startPosition_, total_);
- if (logger_.isDebugEnabled())
- logger_.debug("Done streaming " + file);
+ {
+ FileChannel fc = raf.getChannel();
+
+ ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);
+ channel.write(buffer);
+ assert buffer.remaining() == 0;
+
+ while (start < endPosition)
+ {
+ long bytesTransferred = fc.transferTo(start, CHUNK_SIZE, channel);
+ if (logger.isDebugEnabled())
+ logger.debug("Bytes transferred " + bytesTransferred);
+ start += bytesTransferred;
+ }
}
- catch (Exception e)
+ finally
{
- if (connection != null)
+ try
{
- connection.errorClose();
+ raf.close();
+ }
+ catch (IOException e)
+ {
+ throw new AssertionError(e);
}
- throw new RuntimeException(e);
}
- */
}
+
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=901436&r1=901435&r2=901436&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan 20 23:37:38 2010
@@ -9,6 +9,9 @@
import org.apache.log4j.Logger;
+import org.apache.cassandra.net.io.IncomingStreamReader;
+import org.apache.cassandra.utils.FBUtilities;
+
public class IncomingTcpConnection extends Thread
{
private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
@@ -18,9 +21,11 @@
private final byte[] headerBytes = new byte[4];
private final byte[] sizeBytes = new byte[4];
private final ByteBuffer sizeBuffer = ByteBuffer.wrap(sizeBytes).asReadOnlyBuffer();
+ private Socket socket;
public IncomingTcpConnection(Socket socket)
{
+ this.socket = socket;
try
{
input = new DataInputStream(socket.getInputStream());
@@ -40,13 +45,27 @@
{
input.readFully(protocolBytes);
MessagingService.validateProtocol(protocolBytes);
+
input.readFully(headerBytes);
- input.readFully(sizeBytes);
- int size = sizeBuffer.getInt();
- sizeBuffer.clear();
- byte[] contentBytes = new byte[size];
- input.readFully(contentBytes);
- MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+ int pH = FBUtilities.byteArrayToInt(headerBytes);
+ int type = MessagingService.getBits(pH, 1, 2);
+ boolean isStream = MessagingService.getBits(pH, 3, 1) == 1;
+ int version = MessagingService.getBits(pH, 15, 8);
+
+ if (isStream)
+ {
+ new IncomingStreamReader(socket.getChannel()).read();
+ }
+ else
+ {
+ input.readFully(sizeBytes);
+ int size = sizeBuffer.getInt();
+ sizeBuffer.clear();
+
+ byte[] contentBytes = new byte[size];
+ input.readFully(contentBytes);
+ MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+ }
}
catch (IOException e)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901436&r1=901435&r2=901436&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:37:38 2010
@@ -379,14 +379,14 @@
* to not hold any of the contents of the file in memory.
* @param file name of file to stream.
* @param startPosition position inside the file
- * @param total number of bytes to stream
+ * @param endPosition
* @param to endpoint to which we need to stream the file.
*/
- public void stream(String file, long startPosition, long total, InetAddress from, InetAddress to)
+ public void stream(String file, long startPosition, long endPosition, InetAddress from, InetAddress to)
{
/* Streaming asynchronously on streamExector_ threads. */
- Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
+ Runnable streamingTask = new FileStreamTask(file, startPosition, endPosition, from, to);
streamExecutor_.execute(streamingTask);
}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java?rev=901436&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java Wed Jan 20 23:37:38 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.net.InetSocketAddress;
+import java.net.InetAddress;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.io.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.FileStreamTask;
+
+public class IncomingStreamReader
+{
+ private static Logger logger = Logger.getLogger(IncomingStreamReader.class);
+ private StreamContextManager.StreamContext streamContext;
+ private StreamContextManager.StreamStatus streamStatus;
+ private SocketChannel socketChannel;
+
+ public IncomingStreamReader(SocketChannel socketChannel)
+ {
+ this.socketChannel = socketChannel;
+ InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+ streamContext = StreamContextManager.getStreamContext(remoteAddress.getAddress());
+ assert streamContext != null;
+ streamStatus = StreamContextManager.getStreamStatus(remoteAddress.getAddress());
+ assert streamStatus != null;
+ }
+
+ public void read() throws IOException
+ {
+ InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+ if (logger.isDebugEnabled())
+ logger.debug("Creating file for " + streamContext.getTargetFile());
+ FileOutputStream fos = new FileOutputStream(streamContext.getTargetFile(), true);
+ FileChannel fc = fos.getChannel();
+
+ long bytesRead = 0;
+ try
+ {
+ while (bytesRead < streamContext.getExpectedBytes())
+ bytesRead += fc.transferFrom(socketChannel, bytesRead, FileStreamTask.CHUNK_SIZE);
+ }
+ catch (IOException ex)
+ {
+ /* Ask the source node to re-stream this file. */
+ streamStatus.setAction(StreamContextManager.StreamCompletionAction.STREAM);
+ handleStreamCompletion(remoteAddress.getAddress());
+ /* Delete the orphaned file. */
+ File file = new File(streamContext.getTargetFile());
+ file.delete();
+ throw ex;
+ }
+
+ if (bytesRead == streamContext.getExpectedBytes())
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Removing stream context " + streamContext);
+ }
+ fc.close();
+ handleStreamCompletion(remoteAddress.getAddress());
+ }
+ }
+
+ private void handleStreamCompletion(InetAddress remoteHost) throws IOException
+ {
+ /*
+ * Streaming is complete. If all the data that has to be received inform the sender via
+ * the stream completion callback so that the source may perform the requisite cleanup.
+ */
+ IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
+ if (streamComplete != null)
+ streamComplete.onStreamCompletion(remoteHost, streamContext, streamStatus);
+ }
+}
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
------------------------------------------------------------------------------
svn:eol-style = native