You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2011/11/07 14:36:34 UTC
svn commit: r1198733 [1/13] - in /incubator/jena/Scratch/AFS/Dev/trunk:
src-archive/riot/comms/ src-archive/riot/comms/client/
src-archive/riot/comms/server0/ src-archive/riot/comms/server1/nio/
src-archive/riot/comms/server1/socket/ src-archive/riot/c...
Author: andy
Date: Mon Nov 7 13:36:30 2011
New Revision: 1198733
URL: http://svn.apache.org/viewvc?rev=1198733&view=rev
Log:
Clean up.
Modified:
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/Comms.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/CommsException.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/End.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TokenComms.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TokenStreamEndpoint.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TupleSender.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/client/Client.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/client/DirectChannel.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Lifecycle.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Server.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Service.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/ServiceState.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/ServiceType.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/TokenServer.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/nio/NioServer.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/ServerChannel.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/ServerRequestHandler.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/SocketServer.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/token/Handler.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/token/HandlerFactory.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/io/section/TokenInputSection.java
incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/io/section/TokenOutputSection.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/BindingN.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/DevAFS.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/LogMain.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/RunAFS.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/SizeIndexes.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/StringUTF8.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/TestModelCreation.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/g2/Graph2.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/g2/Index.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/g2/IndexImpl.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/g2/MultiBunch.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/g2/MultiBunchSimple.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/g2/TupleDex.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/g2/TupleIndex.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/rdfs.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/dev/rules.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/table/Table.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/table/TableArray.java
incubator/jena/Scratch/AFS/Dev/trunk/src-dev/table/TableStream.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/libmisc/CodecUTF8.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/ArrayOps.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/ByteArray.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/ByteIO.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/TestSuiteByReflection.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/VarInteger.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/ZigZagInteger.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/tuple/Tuple1.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/tuple/Tuple2.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/tuple/Tuple3.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/tuple/Tuple4.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/tuple/TupleException.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/tuple/TupleN.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/tuple/ZTuple.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/migrate/lib/tuple/_Triple.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/OrderedSet.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/avl/AVL.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/avl/AvlIterator.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/avl/AvlNode.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/exthash/ExtHashMem.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/radix/RadixIndex.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/radix/RadixNode.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/radix/RadixNodeVisitor.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/radix/RadixNodeVisitorBase.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/radix/RadixTree.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/skiplist/SkipList.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/skiplist/SkipListException.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/skiplist/SkipListIterator.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/skiplist/SkipListNode.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/tree/TreeException.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/tree/TreeNode.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/ttree/TTree.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/ttree/TTreeException.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/ttree/TTreeIterator.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/main/java/structure/ttree/TTreeNode.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/migrate/lib/TestArrayOps.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/migrate/lib/TestByteArray.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/migrate/lib/TestVarInteger.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/OrderedSetTest.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/OrderedSetTestBase.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/OrderedSetTestFactory.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/OrderedSetTestLib.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/TS_Structure.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/avl/TestAVL.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/exthash/ExtHashMemTestBase.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/exthash/TestExtHashMem.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/radix/RadixRun.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/radix/TestRadix.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/skiplist/SkipListTestBase.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/skiplist/TestSkipList.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/tree/TestTree.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/ttree/MainTTree.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/structure/ttree/TestTTree.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/test/AVLRun.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/test/ExtHashMemRun.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/test/SkipListRun.java
incubator/jena/Scratch/AFS/Dev/trunk/src-lib/test/java/test/TTreeRun.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/algebra2/AlgebraEngine.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/algebra2/Condition.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/algebra2/JoinEngine.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/algebra2/Table.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/hash/FnvHashFunction.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/hash/HashFunction.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/hash/HashFunctionBase.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/hash/HashToInt.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/jenaplus/TestMRSW.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/ParserSSE.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/TS_SSE.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/TestSSE_Basic.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/TestSSE_Builder.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/TestSSE_Forms.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/io/TestTokenIO.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/io/TokenInputStream.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/io/TokenInputStreamBase.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/io/TokenInputStreamWrapper.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/io/TokenOutputStream.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/io/TokenOutputStreamWrapper.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/io/TokenOutputStreamWriter.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/riot/lang/LangSSE.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/search/BaseSearchPF.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/search/DevSearch.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/search/GooglePF.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/search/Search.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/search/SearchBase.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/search/SearchException.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/search/SearchGoogle.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/search/SearchSite.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/search/SearchWikipedia.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/search/WikipediaPF.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/storage/Indirection.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/storage/Sequence.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/storage/varblock/Design.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/storage/varrecord/TestVarRecordBlock.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/storage/varrecord/VarRecordBuffer.java
incubator/jena/Scratch/AFS/Dev/trunk/src/main/java/tools/Memory.java
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/Comms.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/Comms.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/Comms.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/Comms.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,30 +16,30 @@
* limitations under the License.
*/
-package riot.comms;
-
-import java.util.ArrayList ;
-import java.util.List ;
-
-import org.slf4j.Logger ;
-import org.slf4j.LoggerFactory ;
-import org.openjena.atlas.lib.Pair ;
-
-public class Comms
-{
- /** Root name for the comms logging */
- public static String logRoot = "riot.comms." ;
-
- private static Logger commsLog = LoggerFactory.getLogger(Comms.class) ;
-
- public static List<TokenStreamEndpoint> client(Pair<String, Integer>...dest)
- {
- List<TokenStreamEndpoint> x = new ArrayList<TokenStreamEndpoint>();
- for ( Pair<String, Integer> addr : dest )
- {
- TokenStreamEndpoint z = TokenStreamEndpoint.create(addr.getLeft(), addr.getRight()) ;
- x.add(z) ;
- }
- return x ;
- }
+package riot.comms;
+
+import java.util.ArrayList ;
+import java.util.List ;
+
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+import org.openjena.atlas.lib.Pair ;
+
+public class Comms
+{
+ /** Root name for the comms logging */
+ public static String logRoot = "riot.comms." ;
+
+ private static Logger commsLog = LoggerFactory.getLogger(Comms.class) ;
+
+ public static List<TokenStreamEndpoint> client(Pair<String, Integer>...dest)
+ {
+ List<TokenStreamEndpoint> x = new ArrayList<TokenStreamEndpoint>();
+ for ( Pair<String, Integer> addr : dest )
+ {
+ TokenStreamEndpoint z = TokenStreamEndpoint.create(addr.getLeft(), addr.getRight()) ;
+ x.add(z) ;
+ }
+ return x ;
+ }
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/CommsException.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/CommsException.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/CommsException.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/CommsException.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,19 +16,19 @@
* limitations under the License.
*/
-package riot.comms;
-
-public class CommsException extends RuntimeException
-{
- public CommsException()
- { super(); }
-
- public CommsException(String message)
- { super(message) ; }
-
- public CommsException(String message, Throwable cause)
- { super(message, cause) ; }
-
- public CommsException(Throwable cause)
- { super(cause) ; }
+package riot.comms;
+
+public class CommsException extends RuntimeException
+{
+ public CommsException()
+ { super(); }
+
+ public CommsException(String message)
+ { super(message) ; }
+
+ public CommsException(String message, Throwable cause)
+ { super(message, cause) ; }
+
+ public CommsException(Throwable cause)
+ { super(cause) ; }
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/End.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/End.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/End.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/End.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,9 +16,9 @@
* limitations under the License.
*/
-package riot.comms;
-
-public interface End
-{
- void end() ;
+package riot.comms;
+
+public interface End
+{
+ void end() ;
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TokenComms.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TokenComms.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TokenComms.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TokenComms.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,72 +16,72 @@
* limitations under the License.
*/
-package riot.comms;
-
-import java.io.UnsupportedEncodingException ;
-import java.util.List ;
-
-import org.openjena.riot.tokens.Token ;
-import org.openjena.riot.tokens.TokenType ;
-import org.openjena.riot.tokens.TokenizerText ;
-
-
-public class TokenComms
-{
- public static final char endSectionMarker = 'Z' ;
- public static final char endStreamMarker = 'Y' ;
-
- public static byte[] endSectionMarkerBytes ;
- public static byte[] endStreamMarkerBytes ;
-
-
- static
- {
- try
- {
- endSectionMarkerBytes = cntrlAsString(endSectionMarker).getBytes("ASCII") ;
- endStreamMarkerBytes = cntrlAsString(endStreamMarker).getBytes("ASCII") ;
- } catch (UnsupportedEncodingException ex)
- { // ASCII is required
- throw new Error("ASCII encoding does nto work") ;
- }
- }
-
- //public static final String endMarkerStr = cntrlAsString(endMarker) ;
-
- public static String cntrlAsString(char cntrl)
- {
- return
- Character.toString((char)TokenizerText.CTRL_CHAR)+Character.toString(cntrl);
- }
-
-// public static void sendEndMarker(TupleOutputStream output)
-// {
-// output.sendControl(endMarker) ;
-//// output.startTuple() ;
-//// output.sendString(endMarkerStr) ;
-//// output.endTuple() ;
-// }
-
- public static boolean isEndSectionMarker(List<Token> tuple)
- {
- return isControl(tuple, endSectionMarker) ;
- }
-
- public static boolean isEndStreamMarker(List<Token> tuple)
- {
- return isControl(tuple, endStreamMarker) ;
- }
-
- public static boolean isControl(List<Token> tuple, int ctlCode)
- {
- if ( tuple.size() != 1 ) return false ;
- if ( ! tuple.get(0).hasType(TokenType.CNTRL) )
- return false ;
- int x = tuple.get(0).getCntrlCode() ;
- if ( x == ctlCode )
- return true ;
- return false ;
- }
-
+package riot.comms;
+
+import java.io.UnsupportedEncodingException ;
+import java.util.List ;
+
+import org.openjena.riot.tokens.Token ;
+import org.openjena.riot.tokens.TokenType ;
+import org.openjena.riot.tokens.TokenizerText ;
+
+
+public class TokenComms
+{
+ public static final char endSectionMarker = 'Z' ;
+ public static final char endStreamMarker = 'Y' ;
+
+ public static byte[] endSectionMarkerBytes ;
+ public static byte[] endStreamMarkerBytes ;
+
+
+ static
+ {
+ try
+ {
+ endSectionMarkerBytes = cntrlAsString(endSectionMarker).getBytes("ASCII") ;
+ endStreamMarkerBytes = cntrlAsString(endStreamMarker).getBytes("ASCII") ;
+ } catch (UnsupportedEncodingException ex)
+ { // ASCII is required
+ throw new Error("ASCII encoding does nto work") ;
+ }
+ }
+
+ //public static final String endMarkerStr = cntrlAsString(endMarker) ;
+
+ public static String cntrlAsString(char cntrl)
+ {
+ return
+ Character.toString((char)TokenizerText.CTRL_CHAR)+Character.toString(cntrl);
+ }
+
+// public static void sendEndMarker(TupleOutputStream output)
+// {
+// output.sendControl(endMarker) ;
+//// output.startTuple() ;
+//// output.sendString(endMarkerStr) ;
+//// output.endTuple() ;
+// }
+
+ public static boolean isEndSectionMarker(List<Token> tuple)
+ {
+ return isControl(tuple, endSectionMarker) ;
+ }
+
+ public static boolean isEndStreamMarker(List<Token> tuple)
+ {
+ return isControl(tuple, endStreamMarker) ;
+ }
+
+ public static boolean isControl(List<Token> tuple, int ctlCode)
+ {
+ if ( tuple.size() != 1 ) return false ;
+ if ( ! tuple.get(0).hasType(TokenType.CNTRL) )
+ return false ;
+ int x = tuple.get(0).getCntrlCode() ;
+ if ( x == ctlCode )
+ return true ;
+ return false ;
+ }
+
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TokenStreamEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TokenStreamEndpoint.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TokenStreamEndpoint.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TokenStreamEndpoint.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,125 +16,125 @@
* limitations under the License.
*/
-package riot.comms;
-
-import java.io.IOException ;
-import java.io.InputStream ;
-import java.io.OutputStream ;
-import java.net.Socket ;
-import java.nio.ByteBuffer ;
-import java.nio.channels.SocketChannel ;
-
-import org.openjena.atlas.io.BufferingWriter ;
-import org.openjena.atlas.lib.Sink ;
-import org.openjena.riot.tokens.Tokenizer ;
-import org.openjena.riot.tokens.TokenizerFactory ;
-import riot.comms.client.DirectChannel ;
-import riot.io.TokenInputStream ;
-import riot.io.TokenInputStreamBase ;
-import riot.io.TokenOutputStream ;
-import riot.io.TokenOutputStreamWriter ;
-
-/** A two-way flow of tuples */
-public class TokenStreamEndpoint
-{
- /** Actively create */
- public static TokenStreamEndpoint create(String hostname, int port)
- {
- DirectChannel directChannel = new DirectChannel(hostname, port) ;
- String label = hostname+":"+port ;
- return create(label,label, directChannel.getSocket()) ;
- }
-
- /** Passively create */
- public static TokenStreamEndpoint create(SocketChannel channel)
- {
- Socket socket = channel.socket() ;
- try
- {
- InputStream input = socket.getInputStream() ;
- OutputStream output = socket.getOutputStream() ;
- return new TokenStreamEndpoint("input", "output", socket, input, output) ;
- } catch (IOException ex)
- {
- throw new CommsException(ex) ;
- }
- }
-
- public static TokenStreamEndpoint create(String labelInput, String labelOutput,
- InputStream input, OutputStream output)
- {
- return new TokenStreamEndpoint(labelInput, labelOutput, null, input, output) ;
- }
-
- public static TokenStreamEndpoint create(String labelInput, String labelOutput,
- Socket socket)
- {
- try
- {
- return new TokenStreamEndpoint(labelInput, labelOutput, socket, socket.getInputStream(), socket.getOutputStream()) ;
- } catch (IOException ex)
- {
- throw new CommsException(ex) ;
- }
- }
-
-
- private TokenInputStream tokenInput ;
- private TokenOutputStream tokenOutput ;
- private String labelInput ;
- private String labelOutput ;
- private Socket socket ;
-
- private TokenStreamEndpoint(String labelInput, String labelOutput, Socket socket, InputStream input, OutputStream output)
- {
- this.socket = socket ;
-
- this.labelInput = labelInput ;
- this.labelOutput = labelOutput ;
-
- // Build all the stack of classes so that a raw byte stream is treated as a tuple stream.
- Tokenizer stream = TokenizerFactory.makeTokenizerUTF8(input) ;
- this.tokenInput = new TokenInputStreamBase(labelInput, stream) ;
-
- // Output.
- Sink<ByteBuffer> dest = new BufferingWriter.SinkOutputStream(output) ;
- BufferingWriter bw = new BufferingWriter(dest) ;
- this.tokenOutput = new TokenOutputStreamWriter(labelOutput, bw) ;
- }
-
- @Override
- public String toString()
- {
- return labelInput+" -> "+labelOutput ;
- }
-
-
- public boolean endOfInput()
- {
- return ! tokenInput.hasNext() ;
- }
-
- public TokenInputStream getInput()
- {
- return tokenInput ;
- }
-
- public TokenOutputStream getOutput()
- {
- return tokenOutput ;
- }
-
- public void close()
- {
- try
- {
- tokenInput.close() ;
- if ( socket != null )
- socket.close() ;
- tokenOutput.close() ;
- } catch (Exception ex)
- {}
- }
-
+package riot.comms;
+
+import java.io.IOException ;
+import java.io.InputStream ;
+import java.io.OutputStream ;
+import java.net.Socket ;
+import java.nio.ByteBuffer ;
+import java.nio.channels.SocketChannel ;
+
+import org.openjena.atlas.io.BufferingWriter ;
+import org.openjena.atlas.lib.Sink ;
+import org.openjena.riot.tokens.Tokenizer ;
+import org.openjena.riot.tokens.TokenizerFactory ;
+import riot.comms.client.DirectChannel ;
+import riot.io.TokenInputStream ;
+import riot.io.TokenInputStreamBase ;
+import riot.io.TokenOutputStream ;
+import riot.io.TokenOutputStreamWriter ;
+
+/** A two-way flow of tuples */
+public class TokenStreamEndpoint
+{
+ /** Actively create */
+ public static TokenStreamEndpoint create(String hostname, int port)
+ {
+ DirectChannel directChannel = new DirectChannel(hostname, port) ;
+ String label = hostname+":"+port ;
+ return create(label,label, directChannel.getSocket()) ;
+ }
+
+ /** Passively create */
+ public static TokenStreamEndpoint create(SocketChannel channel)
+ {
+ Socket socket = channel.socket() ;
+ try
+ {
+ InputStream input = socket.getInputStream() ;
+ OutputStream output = socket.getOutputStream() ;
+ return new TokenStreamEndpoint("input", "output", socket, input, output) ;
+ } catch (IOException ex)
+ {
+ throw new CommsException(ex) ;
+ }
+ }
+
+ public static TokenStreamEndpoint create(String labelInput, String labelOutput,
+ InputStream input, OutputStream output)
+ {
+ return new TokenStreamEndpoint(labelInput, labelOutput, null, input, output) ;
+ }
+
+ public static TokenStreamEndpoint create(String labelInput, String labelOutput,
+ Socket socket)
+ {
+ try
+ {
+ return new TokenStreamEndpoint(labelInput, labelOutput, socket, socket.getInputStream(), socket.getOutputStream()) ;
+ } catch (IOException ex)
+ {
+ throw new CommsException(ex) ;
+ }
+ }
+
+
+ private TokenInputStream tokenInput ;
+ private TokenOutputStream tokenOutput ;
+ private String labelInput ;
+ private String labelOutput ;
+ private Socket socket ;
+
+ private TokenStreamEndpoint(String labelInput, String labelOutput, Socket socket, InputStream input, OutputStream output)
+ {
+ this.socket = socket ;
+
+ this.labelInput = labelInput ;
+ this.labelOutput = labelOutput ;
+
+ // Build all the stack of classes so that a raw byte stream is treated as a tuple stream.
+ Tokenizer stream = TokenizerFactory.makeTokenizerUTF8(input) ;
+ this.tokenInput = new TokenInputStreamBase(labelInput, stream) ;
+
+ // Output.
+ Sink<ByteBuffer> dest = new BufferingWriter.SinkOutputStream(output) ;
+ BufferingWriter bw = new BufferingWriter(dest) ;
+ this.tokenOutput = new TokenOutputStreamWriter(labelOutput, bw) ;
+ }
+
+ @Override
+ public String toString()
+ {
+ return labelInput+" -> "+labelOutput ;
+ }
+
+
+ public boolean endOfInput()
+ {
+ return ! tokenInput.hasNext() ;
+ }
+
+ public TokenInputStream getInput()
+ {
+ return tokenInput ;
+ }
+
+ public TokenOutputStream getOutput()
+ {
+ return tokenOutput ;
+ }
+
+ public void close()
+ {
+ try
+ {
+ tokenInput.close() ;
+ if ( socket != null )
+ socket.close() ;
+ tokenOutput.close() ;
+ } catch (Exception ex)
+ {}
+ }
+
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TupleSender.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TupleSender.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TupleSender.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/TupleSender.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,42 +16,42 @@
* limitations under the License.
*/
-package riot.comms;
-
-import riot.io.TokenOutputStream ;
-
-import com.hp.hpl.jena.graph.Node;
-
-import org.openjena.atlas.lib.Tuple;
-
-/** Send tuples of tokens over a TokenOutputStream */
-public class TupleSender
-{
- private TokenOutputStream output ;
-
- public TupleSender(TokenOutputStream endpoint)
- {
- this.output = endpoint ;
- }
-
- public void send(String verb, Tuple<Node> tuple)
- { send(verb, tuple.tuple() ) ; }
-
- public void send(String verb, Node... tuple)
- {
- output.startTuple() ;
- if ( verb != null )
- output.sendWord(verb) ;
- for ( Node n : tuple )
- output.sendNode(n) ;
- output.endTuple() ;
- }
-
- public void close() { output.close() ; }
- public void flush() { output.sync() ; }
-
- public void sendEnd()
- {
- output.endSection() ;
- }
+package riot.comms;
+
+import riot.io.TokenOutputStream ;
+
+import com.hp.hpl.jena.graph.Node;
+
+import org.openjena.atlas.lib.Tuple;
+
+/** Send tuples of tokens over a TokenOutputStream */
+public class TupleSender
+{
+ private TokenOutputStream output ;
+
+ public TupleSender(TokenOutputStream endpoint)
+ {
+ this.output = endpoint ;
+ }
+
+ public void send(String verb, Tuple<Node> tuple)
+ { send(verb, tuple.tuple() ) ; }
+
+ public void send(String verb, Node... tuple)
+ {
+ output.startTuple() ;
+ if ( verb != null )
+ output.sendWord(verb) ;
+ for ( Node n : tuple )
+ output.sendNode(n) ;
+ output.endTuple() ;
+ }
+
+ public void close() { output.close() ; }
+ public void flush() { output.sync() ; }
+
+ public void sendEnd()
+ {
+ output.endSection() ;
+ }
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/client/Client.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/client/Client.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/client/Client.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/client/Client.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,47 +16,47 @@
* limitations under the License.
*/
-package riot.comms.client;
-
-import java.net.InetSocketAddress ;
-import java.net.Socket ;
-
-import riot.comms.TokenStreamEndpoint ;
-import org.openjena.atlas.lib.Cache ;
-import org.openjena.atlas.lib.CacheFactory ;
-import org.openjena.atlas.lib.cache.Getter ;
-
-/** An client endpoint that caches connections to a server */
-public class Client
-{
- static Getter<InetSocketAddress, DirectChannel> getter = new Getter<InetSocketAddress, DirectChannel>(){
- @Override
- public DirectChannel get(InetSocketAddress addr)
- {
- return new DirectChannel(addr) ;
- }
- } ;
- // Eclipse bug?
- static Cache<InetSocketAddress, DirectChannel> cache1 = CacheFactory.createCacheUnbounded() ;
- static Cache<InetSocketAddress, DirectChannel> cache = CacheFactory.createCacheWithGetter(cache1, getter) ;
-
- public static Socket getSocket(String hostname, int port)
- {
- InetSocketAddress addr = new InetSocketAddress(hostname, port) ;
- return cache.get(addr).getSocket() ;
- }
-
- public static TokenStreamEndpoint getTokenStream(String hostname, int port)
- {
- Socket socket = getSocket(hostname, port) ;
- String in = addrStr(hostname, port) ;
- String out = addrStr(hostname, port) ;
- return TokenStreamEndpoint.create(in, out, socket) ;
- }
-
- private static String addrStr(String hostname, int port)
- {
- return hostname+":"+port ;
- }
-
+package riot.comms.client;
+
+import java.net.InetSocketAddress ;
+import java.net.Socket ;
+
+import riot.comms.TokenStreamEndpoint ;
+import org.openjena.atlas.lib.Cache ;
+import org.openjena.atlas.lib.CacheFactory ;
+import org.openjena.atlas.lib.cache.Getter ;
+
+/** An client endpoint that caches connections to a server */
+public class Client
+{
+ static Getter<InetSocketAddress, DirectChannel> getter = new Getter<InetSocketAddress, DirectChannel>(){
+ @Override
+ public DirectChannel get(InetSocketAddress addr)
+ {
+ return new DirectChannel(addr) ;
+ }
+ } ;
+ // Eclipse bug?
+ static Cache<InetSocketAddress, DirectChannel> cache1 = CacheFactory.createCacheUnbounded() ;
+ static Cache<InetSocketAddress, DirectChannel> cache = CacheFactory.createCacheWithGetter(cache1, getter) ;
+
+ public static Socket getSocket(String hostname, int port)
+ {
+ InetSocketAddress addr = new InetSocketAddress(hostname, port) ;
+ return cache.get(addr).getSocket() ;
+ }
+
+ public static TokenStreamEndpoint getTokenStream(String hostname, int port)
+ {
+ Socket socket = getSocket(hostname, port) ;
+ String in = addrStr(hostname, port) ;
+ String out = addrStr(hostname, port) ;
+ return TokenStreamEndpoint.create(in, out, socket) ;
+ }
+
+ private static String addrStr(String hostname, int port)
+ {
+ return hostname+":"+port ;
+ }
+
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/client/DirectChannel.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/client/DirectChannel.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/client/DirectChannel.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/client/DirectChannel.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,53 +16,53 @@
* limitations under the License.
*/
-package riot.comms.client;
-
-import java.io.IOException ;
-import java.net.InetSocketAddress ;
-import java.net.Socket ;
-import java.nio.channels.SocketChannel ;
-
-import riot.comms.CommsException ;
-
-/** Direct cut-through. Use with great case */
-public class DirectChannel
-{
- private InetSocketAddress addr ;
- private SocketChannel channel ;
- private Socket socket ;
- public DirectChannel(String hostname, int port)
- {
- this(new InetSocketAddress(hostname, port)) ;
- }
-
- public DirectChannel(InetSocketAddress addr)
- {
- this.addr = addr ;
- try
- {
-// this.channel = SocketChannel.open() ;
-// this.socket = channel.socket() ;
-// socket.connect(addr) ;
- socket = new Socket(addr.getHostName(), addr.getPort());
- } catch (IOException ex)
- {
- throw new CommsException(ex) ;
- }
- }
-
- public InetSocketAddress getAddr()
- {
- return addr ;
- }
-
- public SocketChannel getChannel()
- {
- return channel ;
- }
-
- public Socket getSocket()
- {
- return socket ;
- }
+package riot.comms.client;
+
+import java.io.IOException ;
+import java.net.InetSocketAddress ;
+import java.net.Socket ;
+import java.nio.channels.SocketChannel ;
+
+import riot.comms.CommsException ;
+
+/** Direct cut-through. Use with great case */
+public class DirectChannel
+{
+ private InetSocketAddress addr ;
+ private SocketChannel channel ;
+ private Socket socket ;
+ public DirectChannel(String hostname, int port)
+ {
+ this(new InetSocketAddress(hostname, port)) ;
+ }
+
+ public DirectChannel(InetSocketAddress addr)
+ {
+ this.addr = addr ;
+ try
+ {
+// this.channel = SocketChannel.open() ;
+// this.socket = channel.socket() ;
+// socket.connect(addr) ;
+ socket = new Socket(addr.getHostName(), addr.getPort());
+ } catch (IOException ex)
+ {
+ throw new CommsException(ex) ;
+ }
+ }
+
+ public InetSocketAddress getAddr()
+ {
+ return addr ;
+ }
+
+ public SocketChannel getChannel()
+ {
+ return channel ;
+ }
+
+ public Socket getSocket()
+ {
+ return socket ;
+ }
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Lifecycle.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Lifecycle.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Lifecycle.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Lifecycle.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,18 +16,18 @@
* limitations under the License.
*/
-package riot.comms.server0;
-
-public interface Lifecycle
-{
- /** Asynchronous service start */
- public void start() ;
-
- /** Asynchronous service stop */
- public void stop() ;
-
- /** Run and block */
- public void run() ;
-
- public boolean isActive() ;
+package riot.comms.server0;
+
+public interface Lifecycle
+{
+ /** Asynchronous service start */
+ public void start() ;
+
+ /** Asynchronous service stop */
+ public void stop() ;
+
+ /** Run and block */
+ public void run() ;
+
+ public boolean isActive() ;
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Server.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Server.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Server.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Server.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,55 +16,55 @@
* limitations under the License.
*/
-package riot.comms.server0;
-
+package riot.comms.server0;
+
import riot.comms.server1.socket.ServerRequestHandler ;
import riot.comms.server1.socket.SocketServer ;
-
-public abstract class Server implements Service
-{
- /** Fork a server */
- public static Server fork(String label, ServiceType serviceType, ServerRequestHandler dispatch, int port)
- {
- Server ps = make(label, serviceType, dispatch, port) ;
- ps.start() ;
- /* In small-scale testing, it helps to kick the server first */
- Thread.yield() ;
- return ps ;
- }
-
- /** Run a server (does not fork - this call does not return until server exits) */
- public static void run(String label, ServiceType serviceType, ServerRequestHandler dispatch, int port)
- {
- Server ps = make(label, serviceType, dispatch, port) ;
- ps.run() ;
- }
-
- private static Server make(String label, ServiceType serviceType, ServerRequestHandler dispatch, int port)
- {
- return new SocketServer(port, label, serviceType, dispatch) ;
- }
-
- // --------
- private ServiceState state ;
- private ServiceType serviceType ;
-
-
- protected Server(ServiceType serviceType) {
- this.state = ServiceState.CREATED ;
- this.serviceType = serviceType ;
- }
-
- protected void setState(ServiceState newState) { state = newState ; }
- @Override
- public final ServiceState getState() { return state ; }
-
- @Override
- public boolean isActive()
- {
- return getState() == ServiceState.ACTIVE ;
- }
-
- @Override
- public final ServiceType getServiceType() { return serviceType ; }
+
+public abstract class Server implements Service
+{
+ /** Fork a server */
+ public static Server fork(String label, ServiceType serviceType, ServerRequestHandler dispatch, int port)
+ {
+ Server ps = make(label, serviceType, dispatch, port) ;
+ ps.start() ;
+ /* In small-scale testing, it helps to kick the server first */
+ Thread.yield() ;
+ return ps ;
+ }
+
+ /** Run a server (does not fork - this call does not return until server exits) */
+ public static void run(String label, ServiceType serviceType, ServerRequestHandler dispatch, int port)
+ {
+ Server ps = make(label, serviceType, dispatch, port) ;
+ ps.run() ;
+ }
+
+ private static Server make(String label, ServiceType serviceType, ServerRequestHandler dispatch, int port)
+ {
+ return new SocketServer(port, label, serviceType, dispatch) ;
+ }
+
+ // --------
+ private ServiceState state ;
+ private ServiceType serviceType ;
+
+
+ protected Server(ServiceType serviceType) {
+ this.state = ServiceState.CREATED ;
+ this.serviceType = serviceType ;
+ }
+
+ protected void setState(ServiceState newState) { state = newState ; }
+ @Override
+ public final ServiceState getState() { return state ; }
+
+ @Override
+ public boolean isActive()
+ {
+ return getState() == ServiceState.ACTIVE ;
+ }
+
+ @Override
+ public final ServiceType getServiceType() { return serviceType ; }
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Service.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Service.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Service.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/Service.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-package riot.comms.server0;
-
-public interface Service extends Lifecycle
-{
- public ServiceState getState() ;
- public ServiceType getServiceType() ;
-
+package riot.comms.server0;
+
+public interface Service extends Lifecycle
+{
+ public ServiceState getState() ;
+ public ServiceType getServiceType() ;
+
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/ServiceState.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/ServiceState.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/ServiceState.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/ServiceState.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,20 +16,20 @@
* limitations under the License.
*/
-package riot.comms.server0;
-
-public enum ServiceState
-{
- CREATED("created"),
- ACTIVE("active"),
- FINISHED("finished"),
- BROKEN("broken");
-
- private String label ;
-
- private ServiceState(String label) { this.label = label ; }
-
- public String getLabel() { return label ; }
- @Override
- public String toString() { return getLabel() ; }
+package riot.comms.server0;
+
+public enum ServiceState
+{
+ CREATED("created"),
+ ACTIVE("active"),
+ FINISHED("finished"),
+ BROKEN("broken");
+
+ private String label ;
+
+ private ServiceState(String label) { this.label = label ; }
+
+ public String getLabel() { return label ; }
+ @Override
+ public String toString() { return getLabel() ; }
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/ServiceType.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/ServiceType.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/ServiceType.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/ServiceType.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,21 +16,21 @@
* limitations under the License.
*/
-package riot.comms.server0;
-
-// VNode?
-public enum ServiceType
-{
- INDEX_SERVICE("index node"),
- DATA_NODE("data node"),
- ANY("any"), PING("ping")
- ;
-
- private String label ;
-
- private ServiceType(String name) { this.label = name ; }
-
- public String getLabel() { return label ; }
- @Override
- public String toString() { return getLabel() ; }
+package riot.comms.server0;
+
+// VNode?
+public enum ServiceType
+{
+ INDEX_SERVICE("index node"),
+ DATA_NODE("data node"),
+ ANY("any"), PING("ping")
+ ;
+
+ private String label ;
+
+ private ServiceType(String name) { this.label = name ; }
+
+ public String getLabel() { return label ; }
+ @Override
+ public String toString() { return getLabel() ; }
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/TokenServer.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/TokenServer.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/TokenServer.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server0/TokenServer.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,50 +16,50 @@
* limitations under the License.
*/
-package riot.comms.server0;
-
-import java.io.InputStream ;
-import java.io.OutputStream ;
-
-import org.slf4j.Logger ;
-import org.slf4j.LoggerFactory ;
-import riot.comms.TokenStreamEndpoint ;
+package riot.comms.server0;
+
+import java.io.InputStream ;
+import java.io.OutputStream ;
+
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+import riot.comms.TokenStreamEndpoint ;
import riot.comms.server1.socket.ServerRequestHandler ;
import riot.comms.server1.token.Handler ;
import riot.comms.server1.token.HandlerFactory ;
-
-public class TokenServer
-{
- private static Logger log = LoggerFactory.getLogger(TokenServer.class) ;
-
- public static Server fork(HandlerFactory handlerFactory, ServiceType serviceType, int port)
- {
- Server server = Server.fork("TokenServer", serviceType, new TokenRequestHandler(handlerFactory),port) ;
- return server ;
- }
-
- public static void run(HandlerFactory handlerFactory, ServiceType serviceType, int port)
- {
- Server.run("TokenServer", serviceType, new TokenRequestHandler(handlerFactory),port) ;
- }
-
- static class TokenRequestHandler implements ServerRequestHandler
- {
- private HandlerFactory handlerFactory ;
-
- TokenRequestHandler(HandlerFactory handlerFactory)
- {
- this.handlerFactory = handlerFactory ;
- }
-
- @Override
- public void handleRequests(InputStream inputStream, OutputStream outputStream)
- {
- log.info("Entry: handleRequests") ;
- TokenStreamEndpoint endpoint = TokenStreamEndpoint.create("in", "out", inputStream, outputStream) ;
- Handler handler = handlerFactory.create(endpoint) ;
- handler.loop() ;
- log.info("Exit: handleRequests") ;
- }
- }
+
+public class TokenServer
+{
+ private static Logger log = LoggerFactory.getLogger(TokenServer.class) ;
+
+ public static Server fork(HandlerFactory handlerFactory, ServiceType serviceType, int port)
+ {
+ Server server = Server.fork("TokenServer", serviceType, new TokenRequestHandler(handlerFactory),port) ;
+ return server ;
+ }
+
+ public static void run(HandlerFactory handlerFactory, ServiceType serviceType, int port)
+ {
+ Server.run("TokenServer", serviceType, new TokenRequestHandler(handlerFactory),port) ;
+ }
+
+ static class TokenRequestHandler implements ServerRequestHandler
+ {
+ private HandlerFactory handlerFactory ;
+
+ TokenRequestHandler(HandlerFactory handlerFactory)
+ {
+ this.handlerFactory = handlerFactory ;
+ }
+
+ @Override
+ public void handleRequests(InputStream inputStream, OutputStream outputStream)
+ {
+ log.info("Entry: handleRequests") ;
+ TokenStreamEndpoint endpoint = TokenStreamEndpoint.create("in", "out", inputStream, outputStream) ;
+ Handler handler = handlerFactory.create(endpoint) ;
+ handler.loop() ;
+ log.info("Exit: handleRequests") ;
+ }
+ }
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/nio/NioServer.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/nio/NioServer.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/nio/NioServer.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/nio/NioServer.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,146 +16,146 @@
* limitations under the License.
*/
-package riot.comms.server1.nio;
-
-import java.io.IOException ;
-import java.io.InputStream ;
-import java.io.OutputStream ;
-import java.net.InetSocketAddress ;
-import java.net.ServerSocket ;
-import java.net.Socket ;
-import java.net.SocketAddress ;
-import java.nio.channels.SelectionKey ;
-import java.nio.channels.Selector ;
-import java.nio.channels.ServerSocketChannel ;
-import java.nio.channels.SocketChannel ;
-import java.util.Iterator ;
-
-import org.slf4j.Logger ;
-import org.slf4j.LoggerFactory ;
-import riot.comms.CommsException ;
+package riot.comms.server1.nio;
+
+import java.io.IOException ;
+import java.io.InputStream ;
+import java.io.OutputStream ;
+import java.net.InetSocketAddress ;
+import java.net.ServerSocket ;
+import java.net.Socket ;
+import java.net.SocketAddress ;
+import java.nio.channels.SelectionKey ;
+import java.nio.channels.Selector ;
+import java.nio.channels.ServerSocketChannel ;
+import java.nio.channels.SocketChannel ;
+import java.util.Iterator ;
+
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+import riot.comms.CommsException ;
import riot.comms.server0.Server ;
import riot.comms.server0.Service ;
import riot.comms.server0.ServiceType ;
import riot.comms.server1.socket.ServerRequestHandler ;
-
-/** Basic channel handling with NIO */
-class NioServer extends Server implements Service, Runnable
-{
- // Needs work - the handling of request sis still socket style.
- // Change to selectors and have a tread pool that can pick up continuations?
- // See other systems - maybe this is no help.
- private static Logger log = LoggerFactory.getLogger(NioServer.class) ;
-
- private int port ;
- private SocketAddress serverEndpoint ;
-
- private ServerRequestHandler requestHandler ;
-
- public NioServer(int port, String label, ServiceType serviceType, ServerRequestHandler requestHandler)
- {
- super(serviceType) ;
- this.port = port ;
- serverEndpoint = new InetSocketAddress(port) ;
- this.requestHandler = requestHandler ;
- }
-
- volatile boolean shutdown = false ;
-
- @Override
- public void start()
- {}
-
- /** Gracefully stop */
- @Override
- public void stop()
- {
- shutdown = true ;
- }
-
- @Override
- public void run()
- {
- server() ;
- }
-
- void server()
- {
- try
- {
- Selector selector = Selector.open() ;
- ServerSocketChannel chann = ServerSocketChannel.open() ; // Listener
- ServerSocket sock = chann.socket() ;
-
- sock.bind(serverEndpoint, 10) ;
- chann.configureBlocking(false) ;
-
- // Only valid operation for this socket - it's a listener.
- int ops = SelectionKey.OP_ACCEPT ;
- //| SelectionKey.OP_CONNECT
- //| SelectionKey.OP_READ
- //| SelectionKey.OP_WRITE
- //;
- SelectionKey key = chann.register(selector, ops) ;
-
- while ( true )
- {
- if ( shutdown )
- break ;
-
- int readyCount = selector.select() ;
- if ( readyCount == 0 )
- continue ;
-
- if ( log.isTraceEnabled() )
- log.trace("Ready Count = "+readyCount) ;
-
- Iterator<SelectionKey> iter = selector.selectedKeys().iterator() ;
- for ( ; iter.hasNext(); )
- {
- SelectionKey sKey = iter.next();
- //log.info("sKey = "+sKey) ;
-
- if ( sKey.isAcceptable() )
- {
- if ( log.isDebugEnabled() )
- log.debug("Accept") ;
- ServerSocketChannel server = (ServerSocketChannel)key.channel() ;
-
- SocketChannel ch = server.accept() ;
- //ch.configureBlocking(true) ;
- Socket socket = ch.socket() ;
- InputStream inputStream = socket.getInputStream() ;
- OutputStream outputStream = socket.getOutputStream() ;
- requestHandler.handleRequests(inputStream, outputStream) ;
- //ch.register(selector, SelectionKey.OP_READ) ;
- }
- else if ( sKey.isConnectable() )
- {
- // Unexpected because this is a server (does not initiate outgoing conenctions).
- throw new CommsException("Unexpected connectable key") ;
- }
-
- else if ( sKey.isReadable() )
- {
- if ( log.isDebugEnabled() )
- log.debug("Read event") ;
- //netEventHandler.read(sKey) ;
- }
- else if ( sKey.isWritable() )
- {
- if ( log.isDebugEnabled() )
- log.debug("Write event") ;
- //netEventHandler.write(sKey) ;
- }
- else { throw new CommsException("Unexpected selector key") ; }
- iter.remove() ;
- }
- }
- } catch (IOException ex)
- {
- ex.printStackTrace();
- }
- log.info("Server exit") ;
- }
+
+/** Basic channel handling with NIO */
+class NioServer extends Server implements Service, Runnable
+{
+ // Needs work - the handling of request sis still socket style.
+ // Change to selectors and have a tread pool that can pick up continuations?
+ // See other systems - maybe this is no help.
+ private static Logger log = LoggerFactory.getLogger(NioServer.class) ;
+
+ private int port ;
+ private SocketAddress serverEndpoint ;
+
+ private ServerRequestHandler requestHandler ;
+
+ public NioServer(int port, String label, ServiceType serviceType, ServerRequestHandler requestHandler)
+ {
+ super(serviceType) ;
+ this.port = port ;
+ serverEndpoint = new InetSocketAddress(port) ;
+ this.requestHandler = requestHandler ;
+ }
+
+ volatile boolean shutdown = false ;
+
+ @Override
+ public void start()
+ {}
+
+ /** Gracefully stop */
+ @Override
+ public void stop()
+ {
+ shutdown = true ;
+ }
+
+ @Override
+ public void run()
+ {
+ server() ;
+ }
+
+ void server()
+ {
+ try
+ {
+ Selector selector = Selector.open() ;
+ ServerSocketChannel chann = ServerSocketChannel.open() ; // Listener
+ ServerSocket sock = chann.socket() ;
+
+ sock.bind(serverEndpoint, 10) ;
+ chann.configureBlocking(false) ;
+
+ // Only valid operation for this socket - it's a listener.
+ int ops = SelectionKey.OP_ACCEPT ;
+ //| SelectionKey.OP_CONNECT
+ //| SelectionKey.OP_READ
+ //| SelectionKey.OP_WRITE
+ //;
+ SelectionKey key = chann.register(selector, ops) ;
+
+ while ( true )
+ {
+ if ( shutdown )
+ break ;
+
+ int readyCount = selector.select() ;
+ if ( readyCount == 0 )
+ continue ;
+
+ if ( log.isTraceEnabled() )
+ log.trace("Ready Count = "+readyCount) ;
+
+ Iterator<SelectionKey> iter = selector.selectedKeys().iterator() ;
+ for ( ; iter.hasNext(); )
+ {
+ SelectionKey sKey = iter.next();
+ //log.info("sKey = "+sKey) ;
+
+ if ( sKey.isAcceptable() )
+ {
+ if ( log.isDebugEnabled() )
+ log.debug("Accept") ;
+ ServerSocketChannel server = (ServerSocketChannel)key.channel() ;
+
+ SocketChannel ch = server.accept() ;
+ //ch.configureBlocking(true) ;
+ Socket socket = ch.socket() ;
+ InputStream inputStream = socket.getInputStream() ;
+ OutputStream outputStream = socket.getOutputStream() ;
+ requestHandler.handleRequests(inputStream, outputStream) ;
+ //ch.register(selector, SelectionKey.OP_READ) ;
+ }
+ else if ( sKey.isConnectable() )
+ {
+ // Unexpected because this is a server (does not initiate outgoing conenctions).
+ throw new CommsException("Unexpected connectable key") ;
+ }
+
+ else if ( sKey.isReadable() )
+ {
+ if ( log.isDebugEnabled() )
+ log.debug("Read event") ;
+ //netEventHandler.read(sKey) ;
+ }
+ else if ( sKey.isWritable() )
+ {
+ if ( log.isDebugEnabled() )
+ log.debug("Write event") ;
+ //netEventHandler.write(sKey) ;
+ }
+ else { throw new CommsException("Unexpected selector key") ; }
+ iter.remove() ;
+ }
+ }
+ } catch (IOException ex)
+ {
+ ex.printStackTrace();
+ }
+ log.info("Server exit") ;
+ }
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/ServerChannel.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/ServerChannel.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/ServerChannel.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/ServerChannel.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,73 +16,73 @@
* limitations under the License.
*/
-package riot.comms.server1.socket;
-
-import java.io.IOException ;
-import java.io.InputStream ;
-import java.io.OutputStream ;
-import java.net.Socket ;
-
-import org.slf4j.Logger ;
-import org.slf4j.LoggerFactory ;
-
-public class ServerChannel implements Runnable
-{
- private final static int StreamBufferSize = 32 * 1024 ;
- private final Logger log ;
- private final Socket socket ;
- private final ServerRequestHandler handler ;
- private final long channelId ;
-
- public ServerChannel(String label,
- Socket socket,
- ServerRequestHandler requestHandler,
- long channelId)
- {
- this.socket = socket ;
- this.channelId = channelId ;
- this.log = LoggerFactory.getLogger(label+"["+channelId+"]") ;
- this.handler = requestHandler ;
- }
-
- @Override
- public void run()
- {
- try {
- InputStream inputStream = socket.getInputStream() ;
- OutputStream outputStream = socket.getOutputStream() ;
- log.info("ServerChannel: " + socket.getRemoteSocketAddress()) ;
- handler.handleRequests(inputStream, outputStream) ;
-
-// while(!isInterrupted() && !socket.isClosed() )
-// {
-// if ( ! handler.handleRequest(inputStream, outputStream) )
-// break ;
-// outputStream.flush();
-// }
-// if(isInterrupted())
-// log.info(Thread.currentThread().getName()
-// + " has been interrupted, closing session.");
-// } catch(EOFException e) {
-// log.info("Client " + socket.getRemoteSocketAddress() + " disconnected.");
- } catch(IOException e) {
-// // if this is an unexpected
-// if(!isClosed)
- log.error("IOException", e);
- } finally {
- try {
- if(!socket.isClosed())
- socket.close();
- } catch(Exception e) {
- log.error("Error while closing socket", e);
- }
- }
- }
-
-
- private boolean isInterrupted()
- {
- return Thread.currentThread().isInterrupted();
- }
-
+package riot.comms.server1.socket;
+
+import java.io.IOException ;
+import java.io.InputStream ;
+import java.io.OutputStream ;
+import java.net.Socket ;
+
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+
+public class ServerChannel implements Runnable
+{
+ private final static int StreamBufferSize = 32 * 1024 ;
+ private final Logger log ;
+ private final Socket socket ;
+ private final ServerRequestHandler handler ;
+ private final long channelId ;
+
+ public ServerChannel(String label,
+ Socket socket,
+ ServerRequestHandler requestHandler,
+ long channelId)
+ {
+ this.socket = socket ;
+ this.channelId = channelId ;
+ this.log = LoggerFactory.getLogger(label+"["+channelId+"]") ;
+ this.handler = requestHandler ;
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+ InputStream inputStream = socket.getInputStream() ;
+ OutputStream outputStream = socket.getOutputStream() ;
+ log.info("ServerChannel: " + socket.getRemoteSocketAddress()) ;
+ handler.handleRequests(inputStream, outputStream) ;
+
+// while(!isInterrupted() && !socket.isClosed() )
+// {
+// if ( ! handler.handleRequest(inputStream, outputStream) )
+// break ;
+// outputStream.flush();
+// }
+// if(isInterrupted())
+// log.info(Thread.currentThread().getName()
+// + " has been interrupted, closing session.");
+// } catch(EOFException e) {
+// log.info("Client " + socket.getRemoteSocketAddress() + " disconnected.");
+ } catch(IOException e) {
+// // if this is an unexpected
+// if(!isClosed)
+ log.error("IOException", e);
+ } finally {
+ try {
+ if(!socket.isClosed())
+ socket.close();
+ } catch(Exception e) {
+ log.error("Error while closing socket", e);
+ }
+ }
+ }
+
+
+ private boolean isInterrupted()
+ {
+ return Thread.currentThread().isInterrupted();
+ }
+
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/ServerRequestHandler.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/ServerRequestHandler.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/ServerRequestHandler.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/ServerRequestHandler.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,13 +16,13 @@
* limitations under the License.
*/
-package riot.comms.server1.socket;
-
-import java.io.InputStream ;
-import java.io.OutputStream ;
-
-public interface ServerRequestHandler
-{
- /** Handle a stream of requests */
- void handleRequests(InputStream inputStream, OutputStream outputStream) ;
+package riot.comms.server1.socket;
+
+import java.io.InputStream ;
+import java.io.OutputStream ;
+
+public interface ServerRequestHandler
+{
+ /** Handle a stream of requests */
+ void handleRequests(InputStream inputStream, OutputStream outputStream) ;
}
Modified: incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/SocketServer.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/SocketServer.java?rev=1198733&r1=1198732&r2=1198733&view=diff
==============================================================================
--- incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/SocketServer.java (original)
+++ incubator/jena/Scratch/AFS/Dev/trunk/src-archive/riot/comms/server1/socket/SocketServer.java Mon Nov 7 13:36:30 2011
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,211 +16,211 @@
* limitations under the License.
*/
-/* Project Voldemort
- * Copyright 2008-2009 LinkedIn, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package riot.comms.server1.socket;
-
-import java.io.IOException ;
-import java.net.BindException ;
-import java.net.InetSocketAddress ;
-import java.net.ServerSocket ;
-import java.net.Socket ;
-import java.net.SocketException ;
-import java.util.concurrent.Executor ;
-import java.util.concurrent.RejectedExecutionHandler ;
-import java.util.concurrent.SynchronousQueue ;
-import java.util.concurrent.ThreadFactory ;
-import java.util.concurrent.ThreadPoolExecutor ;
-import java.util.concurrent.TimeUnit ;
-import java.util.concurrent.atomic.AtomicLong ;
-
-import org.slf4j.Logger ;
-import org.slf4j.LoggerFactory ;
-import riot.comms.CommsException ;
+/* Project Voldemort
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package riot.comms.server1.socket;
+
+import java.io.IOException ;
+import java.net.BindException ;
+import java.net.InetSocketAddress ;
+import java.net.ServerSocket ;
+import java.net.Socket ;
+import java.net.SocketException ;
+import java.util.concurrent.Executor ;
+import java.util.concurrent.RejectedExecutionHandler ;
+import java.util.concurrent.SynchronousQueue ;
+import java.util.concurrent.ThreadFactory ;
+import java.util.concurrent.ThreadPoolExecutor ;
+import java.util.concurrent.TimeUnit ;
+import java.util.concurrent.atomic.AtomicLong ;
+
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+import riot.comms.CommsException ;
import riot.comms.server0.Server ;
import riot.comms.server0.Service ;
import riot.comms.server0.ServiceState ;
import riot.comms.server0.ServiceType ;
-
-/** Classical socket-based, non NIO, server */
-public class SocketServer extends Server implements Service, Runnable
-{
- // Acknowlegments:
- // Understanding how this should be done is taken from looking at
- // Voldemort (Apache license) and Jetty (Eclipse license), then
- // writing what Cohort needs.
-
- // This code borrows from Voldemort (Apache license/copyright LinkedIn).
-
- // Many thanks to those open source projects. Saved me a lot of
- // time underdtanding how this should all work.
-
- // There seems to be both NIO and sockets-based implements aronud (guess: the tradeoffs aren't clear cut).
-
- private static AtomicLong counter = new AtomicLong(0) ;
- private AtomicLong counterChannels = new AtomicLong(0) ;
- private Logger log = null ;
-
- private int port ;
- // Externalize parameters?
- private final int corePoolSize = 5 ; // Later, more
- private final int maxPoolSize = 10 ;
- private final int sessionsPerConnection = 10 ;
- private final Executor executor ;
- private final long id ;
- private final ServiceType serviceType ;
- private final ThreadGroup threadGroup ; // Thread group for all request handlers (not this server thread).
- private final String label ;
- private final ServerRequestHandler requestHandler ;
- private Thread thread ;
- private ServerSocket serverSocket ;
-
- public SocketServer(int port, String label, ServiceType serviceType, ServerRequestHandler requestHandler)
- {
- super(serviceType) ;
- this.label = label ;
- this.serviceType = serviceType ;
- this.requestHandler = requestHandler ;
- this.id = counter.incrementAndGet() ;
- String logName = String.format("%s:%s[%d]", this.getClass().getName(), label, id) ;
- log = LoggerFactory.getLogger(logName) ;
- this.port = port;
-
- ThreadFactory threadFactory = new ThreadFactory() {
- private AtomicLong threadId = new AtomicLong(0);
- @Override
- public Thread newThread(Runnable runnable) {
- String name = SocketServer.this.label + "-" + threadId.getAndIncrement();
- log.info("Thread: "+name) ;
- Thread t = new Thread(threadGroup, runnable, name);
- t.setDaemon(true);
- return t;
- }
- } ;
-
- RejectedExecutionHandler rejHandler = new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
- {
- log.warn("Reject execution") ;
- }
- } ;
-
- // Executor for incoming accept connections
- // This is the most general ThreadPoolExecutor constructor.
- this.executor = new ThreadPoolExecutor(corePoolSize,
- maxPoolSize,
- 1, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(), // Zero buffered queue ???? Easier for now.
- threadFactory, // ThreadFactory
- rejHandler);
- // This is the threadGroup for all
- this.threadGroup = new ThreadGroup(logName) ;
- }
-
- private boolean isInterrupted()
- {
- return Thread.currentThread().isInterrupted();
- }
-
- @Override
- public void run()
- {
- log.info("Starting "+serviceType.getLabel()+" server [" + label + "] on port " + port);
- try {
- serverSocket = new ServerSocket();
- serverSocket.bind(new InetSocketAddress(port));
-
- //serverSocket.setReceiveBufferSize(this.socketBufferSize);
- //startedStatusQueue.put(SUCCESS);
- // Interrupted?
- while( !serverSocket.isClosed() )
- {
- final Socket socket = serverSocket.accept();
- log.info("Accept: "+socket.getRemoteSocketAddress().toString()) ;
-
- setupSocket(socket);
-
- long channelId = counterChannels.getAndIncrement();
- executor.execute(new ServerChannel(label,
- socket,
- requestHandler,
- channelId));
- }
- } catch(BindException e) {
- log.error("Could not bind to port " + port + ".");
- //startedStatusQueue.offer(e);
- throw new CommsException(e);
- } catch(SocketException e) {
- //startedStatusQueue.offer(e);
- // If we have been manually shutdown, ignore
- if(!isInterrupted())
- log.error("Error in server: ", e);
- } catch(IOException e) {
- //startedStatusQueue.offer(e);
- throw new CommsException(e);
- } catch(Throwable t) {
- log.error("Throwable", t);
- //startedStatusQueue.offer(t);
- if(t instanceof Error)
- throw (Error) t;
- else if(t instanceof RuntimeException)
- throw (RuntimeException) t;
- throw new CommsException(t);
- } finally {
- if(serverSocket != null) {
- try {
- serverSocket.close();
- } catch(IOException e) {
- log.warn("Error while closing server socket", e);
- }
- }
-
- }
- }
-
- private void setupSocket(Socket socket) throws SocketException {
- socket.setTcpNoDelay(true);
-// socket.setSendBufferSize(this.socketBufferSize);
-// if(socket.getReceiveBufferSize() != this.socketBufferSize)
-// log.debug("Requested socket receive buffer size was " + this.socketBufferSize
-// + " bytes but actual size is " + socket.getReceiveBufferSize() + " bytes.");
-// if(socket.getSendBufferSize() != this.socketBufferSize)
-// log.debug("Requested socket send buffer size was " + this.socketBufferSize
-// + " bytes but actual size is " + socket.getSendBufferSize() + " bytes.");
- }
-
- @Override
- public void start()
- {
- if ( getState() != ServiceState.CREATED )
- log.error("Out of sequence call to 'start'") ;
- setState(ServiceState.ACTIVE) ;
- this.thread = new Thread(this) ;
- this.thread.start() ;
- }
-
- @Override
- public void stop()
- {
- if ( getState() == ServiceState.CREATED )
- log.error("Not started") ;
- threadGroup.interrupt() ;
- thread.interrupt() ;
- setState(ServiceState.FINISHED) ;
- }
+
+/** Classical socket-based, non NIO, server */
+public class SocketServer extends Server implements Service, Runnable
+{
+ // Acknowlegments:
+ // Understanding how this should be done is taken from looking at
+ // Voldemort (Apache license) and Jetty (Eclipse license), then
+ // writing what Cohort needs.
+
+ // This code borrows from Voldemort (Apache license/copyright LinkedIn).
+
+ // Many thanks to those open source projects. Saved me a lot of
+ // time underdtanding how this should all work.
+
+ // There seems to be both NIO and sockets-based implements aronud (guess: the tradeoffs aren't clear cut).
+
+ private static AtomicLong counter = new AtomicLong(0) ;
+ private AtomicLong counterChannels = new AtomicLong(0) ;
+ private Logger log = null ;
+
+ private int port ;
+ // Externalize parameters?
+ private final int corePoolSize = 5 ; // Later, more
+ private final int maxPoolSize = 10 ;
+ private final int sessionsPerConnection = 10 ;
+ private final Executor executor ;
+ private final long id ;
+ private final ServiceType serviceType ;
+ private final ThreadGroup threadGroup ; // Thread group for all request handlers (not this server thread).
+ private final String label ;
+ private final ServerRequestHandler requestHandler ;
+ private Thread thread ;
+ private ServerSocket serverSocket ;
+
+ public SocketServer(int port, String label, ServiceType serviceType, ServerRequestHandler requestHandler)
+ {
+ super(serviceType) ;
+ this.label = label ;
+ this.serviceType = serviceType ;
+ this.requestHandler = requestHandler ;
+ this.id = counter.incrementAndGet() ;
+ String logName = String.format("%s:%s[%d]", this.getClass().getName(), label, id) ;
+ log = LoggerFactory.getLogger(logName) ;
+ this.port = port;
+
+ ThreadFactory threadFactory = new ThreadFactory() {
+ private AtomicLong threadId = new AtomicLong(0);
+ @Override
+ public Thread newThread(Runnable runnable) {
+ String name = SocketServer.this.label + "-" + threadId.getAndIncrement();
+ log.info("Thread: "+name) ;
+ Thread t = new Thread(threadGroup, runnable, name);
+ t.setDaemon(true);
+ return t;
+ }
+ } ;
+
+ RejectedExecutionHandler rejHandler = new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
+ {
+ log.warn("Reject execution") ;
+ }
+ } ;
+
+ // Executor for incoming accept connections
+ // This is the most general ThreadPoolExecutor constructor.
+ this.executor = new ThreadPoolExecutor(corePoolSize,
+ maxPoolSize,
+ 1, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(), // Zero buffered queue ???? Easier for now.
+ threadFactory, // ThreadFactory
+ rejHandler);
+ // This is the threadGroup for all
+ this.threadGroup = new ThreadGroup(logName) ;
+ }
+
+ private boolean isInterrupted()
+ {
+ return Thread.currentThread().isInterrupted();
+ }
+
+ @Override
+ public void run()
+ {
+ log.info("Starting "+serviceType.getLabel()+" server [" + label + "] on port " + port);
+ try {
+ serverSocket = new ServerSocket();
+ serverSocket.bind(new InetSocketAddress(port));
+
+ //serverSocket.setReceiveBufferSize(this.socketBufferSize);
+ //startedStatusQueue.put(SUCCESS);
+ // Interrupted?
+ while( !serverSocket.isClosed() )
+ {
+ final Socket socket = serverSocket.accept();
+ log.info("Accept: "+socket.getRemoteSocketAddress().toString()) ;
+
+ setupSocket(socket);
+
+ long channelId = counterChannels.getAndIncrement();
+ executor.execute(new ServerChannel(label,
+ socket,
+ requestHandler,
+ channelId));
+ }
+ } catch(BindException e) {
+ log.error("Could not bind to port " + port + ".");
+ //startedStatusQueue.offer(e);
+ throw new CommsException(e);
+ } catch(SocketException e) {
+ //startedStatusQueue.offer(e);
+ // If we have been manually shutdown, ignore
+ if(!isInterrupted())
+ log.error("Error in server: ", e);
+ } catch(IOException e) {
+ //startedStatusQueue.offer(e);
+ throw new CommsException(e);
+ } catch(Throwable t) {
+ log.error("Throwable", t);
+ //startedStatusQueue.offer(t);
+ if(t instanceof Error)
+ throw (Error) t;
+ else if(t instanceof RuntimeException)
+ throw (RuntimeException) t;
+ throw new CommsException(t);
+ } finally {
+ if(serverSocket != null) {
+ try {
+ serverSocket.close();
+ } catch(IOException e) {
+ log.warn("Error while closing server socket", e);
+ }
+ }
+
+ }
+ }
+
+ private void setupSocket(Socket socket) throws SocketException {
+ socket.setTcpNoDelay(true);
+// socket.setSendBufferSize(this.socketBufferSize);
+// if(socket.getReceiveBufferSize() != this.socketBufferSize)
+// log.debug("Requested socket receive buffer size was " + this.socketBufferSize
+// + " bytes but actual size is " + socket.getReceiveBufferSize() + " bytes.");
+// if(socket.getSendBufferSize() != this.socketBufferSize)
+// log.debug("Requested socket send buffer size was " + this.socketBufferSize
+// + " bytes but actual size is " + socket.getSendBufferSize() + " bytes.");
+ }
+
+ @Override
+ public void start()
+ {
+ if ( getState() != ServiceState.CREATED )
+ log.error("Out of sequence call to 'start'") ;
+ setState(ServiceState.ACTIVE) ;
+ this.thread = new Thread(this) ;
+ this.thread.start() ;
+ }
+
+ @Override
+ public void stop()
+ {
+ if ( getState() == ServiceState.CREATED )
+ log.error("Not started") ;
+ threadGroup.interrupt() ;
+ thread.interrupt() ;
+ setState(ServiceState.FINISHED) ;
+ }
}