You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/11/12 20:57:15 UTC

svn commit: r1408427 - /manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java

Author: kwright
Date: Mon Nov 12 19:57:14 2012
New Revision: 1408427

URL: http://svn.apache.org/viewvc?rev=1408427&view=rev
Log:
Add cross-thread input stream class

Added:
    manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java   (with props)

Added: manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java?rev=1408427&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java (added)
+++ manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java Mon Nov 12 19:57:14 2012
@@ -0,0 +1,197 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.common;
+
+import java.io.*;
+
+/** Cross-thread input stream.  Use this class when you have a helper thread
+* reading from a socket, and you need the ability to read safely from a ManifoldCF
+* worker thread.
+*/
+public class XThreadInputStream extends InputStream
+{
+  private byte[] buffer = new byte[65536];
+  private int startPoint = 0;
+  private int byteCount = 0;
+  private boolean streamEnd = false;
+  private InputStream sourceStream;
+  
+  /** Constructor */
+  public XThreadInputStream(InputStream sourceStream)
+  {
+    this.sourceStream = sourceStream;
+  }
+  
+  /** This method is called from the helper thread side, to keep the queue
+  * stuffed.  It exits when the stream is empty, or when interrupted.
+  */
+  public void stuffQueue()
+    throws IOException, InterruptedException
+  {
+    while (true)
+    {
+      int maxToRead;
+      int readStartPoint;
+      synchronized (this)
+      {
+        if (streamEnd)
+          return;
+        // Calculate amount to read
+        maxToRead = buffer.length - byteCount;
+        if (maxToRead == 0)
+        {
+          wait();
+          continue;
+        }
+        readStartPoint = (startPoint + byteCount) & (buffer.length-1);
+      }
+      
+      // See how to break up the reads into pieces.  We only do one piece right now.
+      if (readStartPoint + maxToRead >= buffer.length)
+        maxToRead = buffer.length - readStartPoint;
+      int amt = sourceStream.read(buffer, readStartPoint, maxToRead);
+      synchronized (this)
+      {
+        if (amt == -1)
+          streamEnd = true;
+        else
+          byteCount += amt;
+        notifyAll();
+      }
+    }
+  }
+  
+  /** Read a byte.
+  */
+  @Override
+  public int read()
+    throws IOException
+  {
+    byte[] b = new byte[1];
+    int amt = read(b,0,1);
+    if (amt == -1)
+      return amt;
+    return ((int)b[0]) & 0xffff;
+  }
+    
+  /** Read lots of bytes.
+  */
+  @Override
+  public int read(byte[] b)
+    throws IOException
+  {
+    return read(b,0,b.length);
+  }
+
+  /** Read lots of specific bytes.
+  */
+  @Override
+  public int read(byte[] b, int off, int len)
+    throws IOException
+  {
+    int totalAmt = 0;
+    while (true)
+    {
+      if (len == 0)
+        return 0;
+      int copyLen;
+      synchronized (this)
+      {
+        if (streamEnd)
+        {
+          if (totalAmt != 0)
+            return totalAmt;
+          return -1;
+        }
+        copyLen = byteCount;
+        if (copyLen > len)
+          copyLen = len;
+        int remLen = buffer.length - startPoint;
+        if (copyLen > remLen)
+          copyLen = remLen;
+      }
+      System.arraycopy(buffer, startPoint, b, off, copyLen);
+      totalAmt += copyLen;
+      len -= copyLen;
+      synchronized (this)
+      {
+        startPoint += copyLen;
+        startPoint &= (buffer.length - 1);
+        byteCount -= copyLen;
+      }
+    }
+  }
+  
+  /** Skip
+  */
+  @Override
+  public long skip(long n)
+    throws IOException
+  {
+    // Do nothing
+    return 0L;
+  }
+
+  /** Get available.
+  */
+  @Override
+  public int available()
+    throws IOException
+  {
+    // Not supported
+    return 0;
+  }
+
+  /** Mark.
+  */
+  @Override
+  public void mark(int readLimit)
+  {
+    // Do nothing
+  }
+
+  /** Reset.
+  */
+  @Override
+  public void reset()
+    throws IOException
+  {
+    // Do nothing
+  }
+
+  /** Check if mark is supported.
+  */
+  @Override
+  public boolean markSupported()
+  {
+    // Not supported
+    return false;
+  }
+
+  /** Close.
+  */
+  @Override
+  public void close()
+    throws IOException
+  {
+    // MHL
+  }
+
+}
+

Propchange: manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java
------------------------------------------------------------------------------
    svn:keywords = Id