You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/14 19:15:08 UTC

svn commit: r1157566 [2/23] - in /qpid: branches/rg-amqp-1-0-sandbox/qpid/java/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/s...

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,326 @@
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class Filereceiver extends Util
+{
+    private static final String USAGE_STRING = "filereceiver [options] <address> <directory>\n\nOptions:";
+
+    protected Filereceiver(String[] args)
+    {
+        super(args);
+    }
+
+    @Override
+    protected boolean hasLinkDurableOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasLinkNameOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasResponseQueueOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasSizeOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasBlockOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasStdInOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasTxnOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasModeOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasCountOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected void printUsage(Options options)
+    {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(USAGE_STRING, options );
+
+    }
+
+    @Override
+    protected void run()
+    {
+        final String queue = getArgs()[0];
+        final String directoryName = getArgs()[1];
+
+        try
+        {
+            Connection conn = newConnection();
+
+            Session session = conn.createSession();
+
+            final File directory = new File(directoryName);
+            if(directory.isDirectory() && directory.canWrite())
+            {
+                File tmpDirectory = new File(directoryName, ".tmp");
+                if(!tmpDirectory.exists())
+                {
+                    tmpDirectory.mkdir();
+                }
+
+                String[] unsettledFiles = tmpDirectory.list();
+
+                Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>();
+                final Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>();
+
+                Accepted accepted = new Accepted();
+
+                for(String fileName : unsettledFiles)
+                {
+                    File theFile  = new File(tmpDirectory, fileName);
+                    if(theFile.isFile())
+                    {
+                        if(fileName.startsWith("~") && fileName.endsWith("~"))
+                        {
+                            theFile.delete();
+                        }
+                        else
+                        {
+                            int splitPoint = fileName.indexOf(".");
+                            String deliveryTagStr = fileName.substring(0,splitPoint);
+                            String actualFileName = fileName.substring(splitPoint+1);
+
+                            byte[] bytes = new byte[deliveryTagStr.length()/2];
+
+
+                            for(int i = 0; i < bytes.length; i++)
+                            {
+                                char c = deliveryTagStr.charAt(2*i);
+                                char d = deliveryTagStr.charAt(1+(2*i));
+
+                                bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4)
+                                                  | (d <= '9' ? d - '0' : d - 'W'));
+
+                            }
+                            Binary deliveryTag = new Binary(bytes);
+                            unsettled.put(deliveryTag, accepted);
+                            unsettledFileNames.put(deliveryTag, fileName);
+                        }
+                    }
+
+                }
+
+                Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(),
+                                                    unsettled);
+
+                Map<Binary, Outcome> remoteUnsettled = r.getRemoteUnsettled();
+
+                for(Map.Entry<Binary, String> entry : unsettledFileNames.entrySet())
+                {
+                    if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey()))
+                    {
+
+                        File tmpFile = new File(tmpDirectory, entry.getValue());
+                        final File dest = new File(directory,
+                                entry.getValue().substring(entry.getValue().indexOf(".") + 1));
+                        if(dest.exists())
+                        {
+                            System.err.println("Duplicate detected - filename " + dest.getName());
+                        }
+
+                        tmpFile.renameTo(dest);
+                    }
+                }
+
+
+                int credit = 10;
+
+                r.setCredit(UnsignedInteger.valueOf(credit), true);
+
+
+                int received = 0;
+                Message m = null;
+                do
+                {
+                    m = isBlock() && received == 0 ? r.receive() : r.receive(10000);
+                    if(m != null)
+                    {
+                        if(m.isResume() && unsettled.containsKey(m.getDeliveryTag()))
+                        {
+                            final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag());
+                            final File unsettledFile = new File(tmpDirectory,
+                                    tmpFileName);
+                            r.acknowledge(m, new Receiver.SettledAction()
+                                {
+                                    public void onSettled(final Binary deliveryTag)
+                                    {
+                                        int splitPoint = tmpFileName.indexOf(".");
+
+                                        String fileName = tmpFileName.substring(splitPoint+1);
+
+                                        final File dest = new File(directory, fileName);
+                                        if(dest.exists())
+                                        {
+                                            System.err.println("Duplicate detected - filename " + dest.getName());
+                                        }
+                                        unsettledFile.renameTo(dest);
+                                        unsettledFileNames.remove(deliveryTag);
+                                    }
+                                });
+                        }
+                        else
+                        {
+                            received++;
+                            List<Section> sections = m.getPayload();
+                            Binary deliveryTag = m.getDeliveryTag();
+                            StringBuilder tagNameBuilder = new StringBuilder();
+
+                            ByteBuffer dtbuf = deliveryTag.asByteBuffer();
+                            while(dtbuf.hasRemaining())
+                            {
+                                tagNameBuilder.append(String.format("%02x", dtbuf.get()));
+                            }
+
+
+                            ApplicationProperties properties = null;
+                            List<Binary> data = new ArrayList<Binary>();
+                            int totalSize = 0;
+                            for(Section section : sections)
+                            {
+                                if(section instanceof ApplicationProperties)
+                                {
+                                    properties = (ApplicationProperties) section;
+                                }
+                                else if(section instanceof AmqpValue)
+                                {
+                                    AmqpValue value = (AmqpValue) section;
+                                    if(value.getValue() instanceof Binary)
+                                    {
+                                        Binary binary = (Binary) value.getValue();
+                                        data.add(binary);
+                                        totalSize += binary.getLength();
+
+                                    }
+                                    else
+                                    {
+                                        // TODO exception
+                                    }
+                                }
+                                else if(section instanceof Data)
+                                {
+                                    Data value = (Data) section;
+                                    Binary binary = value.getValue();
+                                    data.add(binary);
+                                    totalSize += binary.getLength();
+
+                                }
+                            }
+                            if(properties != null)
+                            {
+                                final String fileName = (String) properties.getValue().get("filename");
+                                byte[] fileData = new byte[totalSize];
+                                ByteBuffer buf = ByteBuffer.wrap(fileData);
+                                int offset = 0;
+                                for(Binary bin : data)
+                                {
+                                    buf.put(bin.asByteBuffer());
+                                }
+                                File outputFile = new File(tmpDirectory, "~"+fileName+"~");
+                                if(outputFile.exists())
+                                {
+                                    outputFile.delete();
+                                }
+                                FileOutputStream fos = new FileOutputStream(outputFile);
+                                fos.write(fileData);
+                                fos.flush();
+                                fos.close();
+
+                                final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." +
+                                                                   fileName);
+                                outputFile.renameTo(unsettledFile);
+                                r.acknowledge(m, new Receiver.SettledAction()
+                                {
+                                    public void onSettled(final Binary deliveryTag)
+                                    {
+                                        final File dest = new File(directory, fileName);
+                                        if(dest.exists())
+                                        {
+                                            System.err.println("Duplicate detected - filename " + dest.getName());
+                                        }
+                                        unsettledFile.renameTo(dest);
+
+                                    }
+                                });
+
+                            }
+                        }
+                    }
+                }
+                while(m != null);
+
+
+                r.close();
+            }
+            else
+            {
+                System.err.println("No such directory: " + directoryName);
+            }
+            session.close();
+            conn.close();
+        }
+        catch (Connection.ConnectionException e)
+        {
+            e.printStackTrace();
+        }
+        catch (FileNotFoundException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+
+    }
+
+    public static void main(String[] args)
+    {
+        new Filereceiver(args).run();
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,276 @@
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+
+public class Filesender extends Util
+{
+    private static final String USAGE_STRING = "filesender [options] <address> <directory>\n\nOptions:";
+
+    protected Filesender(String[] args)
+    {
+        super(args);
+    }
+
+    @Override
+    protected boolean hasLinkDurableOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasLinkNameOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasResponseQueueOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasSizeOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasBlockOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasStdInOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasTxnOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasModeOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasCountOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected void printUsage(Options options)
+    {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(USAGE_STRING, options );
+
+    }
+
+    @Override
+    protected void run()
+    {
+        final String queue = getArgs()[0];
+        final String directoryName = getArgs()[1];
+
+        try
+        {
+            MessageDigest md5 = MessageDigest.getInstance("MD5");
+            Connection conn = newConnection();
+
+            Session session = conn.createSession();
+
+            File directory = new File(directoryName);
+            if(directory.isDirectory() && directory.canWrite())
+            {
+
+                File tmpDirectory = new File(directoryName, ".tmp");
+                if(!tmpDirectory.exists())
+                {
+                    tmpDirectory.mkdir();
+                }
+
+                String[] unsettledFiles = tmpDirectory.list();
+
+
+
+                Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>();
+                Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>();
+                for(String fileName : unsettledFiles)
+                {
+                    File aFile = new File(tmpDirectory, fileName);
+                    if(aFile.canRead() && aFile.canWrite())
+                    {
+                        Binary deliveryTag = new Binary(md5.digest(fileName.getBytes()));
+                        unsettled.put(deliveryTag, null);
+                        unsettledFileNames.put(deliveryTag, fileName);
+                    }
+                }
+
+
+                Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(),
+                                                unsettled);
+
+                Map<Binary, DeliveryState> remoteUnsettled = s.getRemoteUnsettled();
+
+                for(Map.Entry<Binary, String> entry: unsettledFileNames.entrySet())
+                {
+                    if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey()))
+                    {
+                        (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue()));
+                    }
+                }
+
+                if(remoteUnsettled != null)
+                {
+                    for(Map.Entry<Binary, DeliveryState> entry : remoteUnsettled.entrySet())
+                    {
+                        if(entry.getValue() instanceof Accepted)
+                        {
+                            final String fileName = unsettledFileNames.get(entry.getKey());
+                            if(fileName != null)
+                            {
+
+                                Message resumed = new Message();
+                                resumed.setDeliveryTag(entry.getKey());
+                                resumed.setDeliveryState(entry.getValue());
+                                resumed.setResume(Boolean.TRUE);
+                                resumed.setSettled(Boolean.TRUE);
+
+
+
+                                final File unsettledFile = new File(tmpDirectory, fileName);
+                                unsettledFile.delete();
+
+                                s.send(resumed);
+
+                            }
+
+                        }
+                        else if(entry.getValue() instanceof Received || entry.getValue() == null)
+                        {
+                            final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey()));
+                            Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile);
+                            resumed.setResume(Boolean.TRUE);
+                            Sender.OutcomeAction action = new Sender.OutcomeAction()
+                            {
+                                public void onOutcome(Binary deliveryTag, Outcome outcome)
+                                {
+                                    if(outcome instanceof Accepted)
+                                    {
+                                        unsettledFile.delete();
+                                    }
+                                }
+                            };
+                            s.send(resumed, action);
+
+                        }
+                    }
+                }
+
+
+
+                String[] files = directory.list();
+
+                for(String fileName : files)
+                {
+                    final File file = new File(directory, fileName);
+
+                    if(file.canRead() && file.canWrite() && !file.isDirectory())
+                    {
+                        Message message = createMessageFromFile(md5, fileName, file);
+
+                        final File unsettledFile = new File(tmpDirectory, fileName);
+
+                        Sender.OutcomeAction action = new Sender.OutcomeAction()
+                        {
+                            public void onOutcome(Binary deliveryTag, Outcome outcome)
+                            {
+                                if(outcome instanceof Accepted)
+                                {
+                                    unsettledFile.delete();
+                                }
+                            }
+                        };
+
+                        file.renameTo(unsettledFile);
+
+                        s.send(message, action);
+                    }
+                }
+
+                s.close();
+            }
+            else
+            {
+                System.err.println("No such directory: " + directory);
+            }
+            session.close();
+            conn.close();
+        }
+        catch (Connection.ConnectionException e)
+        {
+            e.printStackTrace();
+        }
+        catch (Sender.SenderCreationException e)
+        {
+            e.printStackTrace();
+        } catch (FileNotFoundException e)
+        {
+            e.printStackTrace();
+        } catch (IOException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        } catch (NoSuchAlgorithmException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        } catch (Sender.SenderClosingException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+
+    }
+
+    private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException
+    {
+        FileInputStream fis = new FileInputStream(file);
+        byte[] data = new byte[(int) file.length()];
+
+        int read = fis.read(data);
+
+        fis.close();
+
+        Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName));
+        Section amqpValue = new Data(new Binary(data));
+        Message message = new Message(Arrays.asList(applicationProperties, amqpValue));
+        Binary deliveryTag = new Binary(md5.digest(fileName.getBytes()));
+        message.setDeliveryTag(deliveryTag);
+        md5.reset();
+        return message;
+    }
+
+    public static void main(String[] args)
+    {
+        new Filesender(args).run();
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class Message
+{
+    private Binary _deliveryTag;
+    private List<Section> _payload = new ArrayList<Section>();
+    private Boolean _resume;
+    private boolean _settled;
+    private DeliveryState _deliveryState;
+
+
+    public Message()
+    {
+    }
+
+    public Message(Collection<Section> sections)
+    {
+        _payload.addAll(sections);
+    }
+
+    public Message(Section section)
+    {
+        this(Collections.singletonList(section));
+    }
+
+    public Message(String message)
+    {
+        this(new AmqpValue(message));
+    }
+
+
+    public Binary getDeliveryTag()
+    {
+        return _deliveryTag;
+    }
+
+    public void setDeliveryTag(Binary deliveryTag)
+    {
+        _deliveryTag = deliveryTag;
+    }
+
+    public List<Section> getPayload()
+    {
+        return Collections.unmodifiableList(_payload);
+    }
+
+    public void setResume(final Boolean resume)
+    {
+        _resume = resume;
+    }
+
+    public boolean isResume()
+    {
+        return Boolean.TRUE.equals(_resume);
+    }
+
+    public void setDeliveryState(DeliveryState state)
+    {
+        _deliveryState = state;
+    }
+
+    public DeliveryState getDeliveryState()
+    {
+        return _deliveryState;
+    }
+
+    public void setSettled(boolean settled)
+    {
+        _settled = settled;
+    }
+
+    public boolean getSettled()
+    {
+        return _settled;
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.codec.ValueHandler;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ReadBytes
+{
+
+    public static void main(String[] args) throws IOException, AmqpErrorException
+    {
+
+        if(args.length == 0)
+        {
+            readBytes(System.in);
+        }
+        else
+        {
+            for(String fileName : args)
+            {
+                System.out.println("=========================== " + fileName + " ===========================");
+                final FileInputStream fis = new FileInputStream(fileName);
+                readBytes(fis);
+                fis.close();
+            }
+        }
+
+    }
+
+    private static void readBytes(final InputStream inputStream) throws IOException, AmqpErrorException
+    {
+        byte[] bytes = new byte[4096];
+
+        ValueHandler valueHandler = new ValueHandler(AMQPDescribedTypeRegistry.newInstance());
+
+        int count;
+
+        while((count = inputStream.read(bytes))!=-1)
+        {
+            ByteBuffer buf = ByteBuffer.wrap(bytes);
+            buf.limit(count);
+            while(buf.hasRemaining())
+            {
+
+                    final Object value = valueHandler.parse(buf);
+                    System.out.print((value == null ? "" : value.getClass().getName() + ":") +value +"\n");
+
+            }
+        }
+
+    }
+
+
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.commons.cli.*;
+
+public class Receive extends Util
+{
+    private static final String USAGE_STRING = "receive [options] <address> \n\nOptions:";
+    private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L);
+    private UnsignedLong _lastCorrelationId;
+
+    public static void main(String[] args)
+    {
+        new Receive(args).run();
+    }
+
+
+    public Receive(final String[] args)
+    {
+        super(args);
+    }
+
+    @Override
+    protected boolean hasLinkDurableOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasLinkNameOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasResponseQueueOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasSizeOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasBlockOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasStdInOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasTxnOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasModeOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasCountOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasWindowSizeOption()
+    {
+        return true;
+    }
+
+    protected void run()
+    {
+
+        try
+        {
+            final String queue = getArgs()[0];
+
+            String message = "";
+
+            Connection conn = newConnection();
+
+
+            Session session = conn.createSession();
+
+
+
+            Receiver r = session.createReceiver(queue, getMode(), getLinkName(), isDurableLink());
+            Transaction txn = null;
+
+            int credit = 0;
+            int receivedCount  = 0;
+
+            if(!useStdIn())
+            {
+                if(getArgs().length <= 2)
+                {
+
+                    Transaction txn2 = null;
+                    if(useTran())
+                    {
+                        txn = session.createSessionLocalTransaction();
+                        txn2 = session.createSessionLocalTransaction();
+                    }
+
+                    for(int i = 0; i < getCount(); i++)
+                    {
+
+                        if(credit == 0)
+                        {
+                            if(getCount() - i <= getWindowSize())
+                            {
+                                credit = getCount() - i;
+
+                            }
+                            else
+                            {
+                                credit = getWindowSize();
+
+                            }
+
+                            {
+                                r.setCredit(UnsignedInteger.valueOf(credit), false);
+                            }
+                            if(!isBlock())
+                                r.drain();
+                        }
+
+                        Message m = isBlock() ? r.receive() : r.receive(1000L);
+                        credit--;
+                        if(m==null)
+                        {
+                            break;
+                        }
+
+
+
+                        r.acknowledge(m.getDeliveryTag(),txn);
+
+                        receivedCount++;
+
+                        System.out.println("Received Message : " + m.getPayload());
+                    }
+
+                    if(useTran())
+                    {
+                        txn.commit();
+                    }
+                }
+                else
+                {
+                    // TODO
+                }
+            }
+            else
+            {
+                // TODO
+            }
+            r.close();
+            session.close();
+            conn.close();
+            System.out.println("Total Messages Received: " + receivedCount);
+        }
+        catch (Connection.ConnectionException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+
+    }
+
+    protected void printUsage(Options options)
+    {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(USAGE_STRING, options );
+    }
+
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,463 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class Receiver implements DeliveryStateHandler
+{
+    private ReceivingLinkEndpoint _endpoint;
+    private int _id;
+    private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100);
+    private Session _session;
+
+    private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
+    private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
+
+
+
+    public Receiver(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
+    {
+        this(session, linkName, targetAddr, sourceAddr, null);
+    }
+    public Receiver(final Session session, final String linkName, final String targetAddr, final String sourceAddr, DistributionMode mode)
+    {
+        this(session, linkName, targetAddr, sourceAddr, mode, AcknowledgeMode.ALO);
+    }
+
+
+    public Receiver(final Session session,
+                    final String linkName,
+                    final String targetAddr,
+                    final String sourceAddr,
+                    final DistributionMode distMode,
+                    final AcknowledgeMode ackMode)
+    {
+        this(session, linkName, createTarget(targetAddr), createSource(sourceAddr, distMode), ackMode);
+
+    }
+
+    public Receiver(final Session session,
+                    final String linkName,
+                    final String targetAddr,
+                    final String sourceAddr,
+                    final DistributionMode distMode,
+                    final AcknowledgeMode ackMode,
+                    boolean isDurable)
+    {
+        this(session, linkName, createTarget(targetAddr), createSource(sourceAddr, distMode), ackMode, isDurable);
+    }
+
+
+    public Receiver(final Session session,
+                    final String linkName,
+                    final String targetAddr,
+                    final String sourceAddr,
+                    final DistributionMode distMode,
+                    final AcknowledgeMode ackMode,
+                    boolean isDurable,
+                    Map<Binary, Outcome> unsettled)
+    {
+        this(session, linkName, createTarget(targetAddr), createSource(sourceAddr, distMode), ackMode, isDurable,
+             unsettled);
+    }
+
+
+    private static Source createSource(final String sourceAddr, final DistributionMode distMode)
+    {
+        Source source = new Source();
+        source.setAddress(sourceAddr);
+        source.setDistributionMode(distMode);
+        return source;
+    }
+
+    private static Target createTarget(final String targetAddr)
+    {
+        Target target = new Target();
+        target.setAddress(targetAddr);
+        return target;
+    }
+
+    public Receiver(final Session session,
+                    final String linkName,
+                    final Target target,
+                    final Source source,
+                    final AcknowledgeMode ackMode)
+    {
+        this(session, linkName, target, source, ackMode, false);
+    }
+    public Receiver(final Session session,
+                    final String linkName,
+                    final Target target,
+                    final Source source,
+                    final AcknowledgeMode ackMode,
+                    boolean isDurable)
+    {
+        this(session,linkName,target,source,ackMode,isDurable,null);
+    }
+
+    public Receiver(final Session session,
+                    final String linkName,
+                    final Target target,
+                    final Source source,
+                    final AcknowledgeMode ackMode,
+                    final boolean isDurable,
+                    final Map<Binary,Outcome> unsettled)
+    {
+
+        _session = session;
+        if(isDurable)
+        {
+            source.setDurable(TerminusDurability.UNSETTLED_STATE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+        }
+        _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source,
+                                                                      UnsignedInteger.ZERO);
+
+        _endpoint.setDeliveryStateHandler(this);
+
+        switch(ackMode)
+        {
+            case ALO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+                break;
+            case AMO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
+                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+                break;
+            case EO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+                break;
+
+        }
+
+        _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener()
+        {
+            @Override public void messageTransfer(final Transfer xfr)
+            {
+                _prefetchQueue.add(xfr);
+            }
+        });
+
+        _endpoint.setLocalUnsettled(unsettled);
+        _endpoint.attach();
+
+
+        synchronized(_endpoint.getLock())
+        {
+            while(!_endpoint.isAttached() || _endpoint.isDetached())
+            {
+                try
+                {
+                    _endpoint.getLock().wait();
+                }
+                catch (InterruptedException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+        }
+    }
+
+    public void setCredit(UnsignedInteger credit, boolean window)
+    {
+        _endpoint.setLinkCredit(credit);
+        _endpoint.setCreditWindow(window);
+
+    }
+
+
+    public String getAddress()
+    {
+        return ((Source)_endpoint.getSource()).getAddress();
+    }
+
+    public Message receive()
+    {
+        return receive(-1L);
+    }
+
+    public Message receive(boolean wait)
+    {
+        return receive(wait ? -1L : 0L);
+    }
+
+    // 0 means no wait, -1 wait forever
+    public Message receive(long wait)
+    {
+        Message m = null;
+        Transfer xfr;
+        long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L;
+
+        while((xfr = receiveFromPrefetch(wait)) != null )
+        {
+
+            if(!Boolean.TRUE.equals(xfr.getAborted()))
+            {
+                Binary deliveryTag = xfr.getDeliveryTag();
+                Boolean resume = xfr.getResume();
+
+                List<Section> sections = new ArrayList<Section>();
+                List<ByteBuffer> payloads = new ArrayList<ByteBuffer>();
+                int totalSize = 0;
+
+                boolean hasMore;
+                do
+                {
+                    hasMore = Boolean.TRUE.equals(xfr.getMore());
+
+                    ByteBuffer buf = xfr.getPayload();
+
+                    if(buf != null)
+                    {
+
+                        totalSize += buf.remaining();
+
+                        payloads.add(buf);
+                    }
+                    if(hasMore)
+                    {
+                        xfr = receiveFromPrefetch(0L);
+                        if(xfr== null)
+                        {
+                            System.out.println("");
+                        }
+                    }
+                }
+                while(hasMore && !Boolean.TRUE.equals(xfr.getAborted()));
+
+                if(!Boolean.TRUE.equals(xfr.getAborted()))
+                {
+                    ByteBuffer allPayload = ByteBuffer.allocate(totalSize);
+                    for(ByteBuffer payload : payloads)
+                    {
+                        allPayload.put(payload);
+                    }
+                    allPayload.flip();
+                    SectionDecoder decoder = _session.getSectionDecoder();
+
+                    try
+                    {
+                        sections = decoder.parseAll(allPayload);
+                    }
+                    catch (AmqpErrorException e)
+                    {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                    m = new Message(sections);
+                    m.setDeliveryTag(deliveryTag);
+                    m.setResume(resume);
+                    break;
+                }
+            }
+
+            if(wait > 0L)
+            {
+                wait = endTime - System.currentTimeMillis();
+                if(wait <=0L)
+                {
+                    break;
+                }
+            }
+        }
+
+
+        return m;
+
+    }
+
+    private Transfer receiveFromPrefetch(long wait)
+        {
+            long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L);
+            final Object lock = _endpoint.getLock();
+            synchronized(lock)
+            {
+                Transfer xfr;
+                while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained())
+                {
+                    try
+                    {
+                        if(wait>=0L)
+                        {
+                            lock.wait(wait);
+                        }
+                        else
+                        {
+                            lock.wait();
+                        }
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                    if(wait > 0L)
+                    {
+                        wait = endTime - System.currentTimeMillis();
+                        if(wait <= 0L)
+                        {
+                            break;
+                        }
+                    }
+
+                }
+                if(xfr != null)
+                {
+                    _prefetchQueue.poll();
+
+                }
+
+                return xfr;
+            }
+
+        }
+
+
+    public void acknowledge(final Message m)
+    {
+        acknowledge(m.getDeliveryTag());
+    }
+
+    public void acknowledge(final Message m, SettledAction a)
+    {
+        acknowledge(m.getDeliveryTag(), a);
+    }
+
+
+    public void acknowledge(final Message m, Transaction txn)
+    {
+        acknowledge(m.getDeliveryTag(), txn);
+    }
+
+
+    public void acknowledge(final Binary deliveryTag)
+    {
+        acknowledge(deliveryTag, null, null);
+    }
+
+
+    public void acknowledge(final Binary deliveryTag, SettledAction a)
+    {
+        acknowledge(deliveryTag, null, a);
+    }
+
+    public void acknowledge(final Binary deliveryTag, final Transaction txn)
+    {
+        acknowledge(deliveryTag, txn, null);
+    }
+
+
+    public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action)
+    {
+
+        Accepted accepted = new Accepted();
+        DeliveryState state;
+        if(txn != null)
+        {
+            TransactionalState txnState = new TransactionalState();
+            txnState.setOutcome(accepted);
+            txnState.setTxnId(txn.getTxnId());
+            state = txnState;
+        }
+        else
+        {
+            state = accepted;
+        }
+        boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
+
+        if(!(settled || action == null))
+        {
+            _unsettledMap.put(deliveryTag, action);
+        }
+
+        _endpoint.updateDisposition(deliveryTag,state, settled);
+    }
+
+    public void acknowledgeAll(Message m)
+    {
+        acknowledgeAll(m.getDeliveryTag());
+    }
+
+    public void acknowledgeAll(Binary deliveryTag)
+    {
+        _endpoint.updateAllDisposition(deliveryTag, new Accepted(), true);
+    }
+
+    public void close()
+    {
+        _endpoint.setTarget(null);
+        _endpoint.detach();
+    }
+
+    public void drain()
+    {
+        _endpoint.drain();
+    }
+
+    public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
+    {
+        _endpoint.setLinkCredit(credit);
+        _endpoint.setTransactionId(txn == null ? null : txn.getTxnId());
+        _endpoint.setCreditWindow(false);
+
+    }
+
+    public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+    {
+        if(Boolean.TRUE.equals(settled))
+        {
+            SettledAction action = _unsettledMap.remove(deliveryTag);
+            if(action != null)
+            {
+                action.onSettled(deliveryTag);
+            }
+        }
+    }
+
+    public Map<Binary, Outcome> getRemoteUnsettled()
+    {
+        return _endpoint.getInitialUnsettledMap();
+    }
+
+
+    public static interface SettledAction
+    {
+        public void onSettled(Binary deliveryTag);
+    }
+}
\ No newline at end of file

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.commons.cli.*;
+
+import java.util.Arrays;
+
+public class Request extends Util
+{
+    private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:";
+
+    public static void main(String[] args)
+    {
+        new Request(args).run();
+    }
+
+    public Request(String[] args)
+    {
+        super(args);
+    }
+
+    @Override
+    protected boolean hasLinkDurableOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasLinkNameOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasResponseQueueOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasSizeOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasBlockOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasStdInOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasTxnOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasModeOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasCountOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasWindowSizeOption()
+    {
+        return true;
+    }
+
+    public void run()
+    {
+
+        try
+        {
+
+
+            final String queue = getArgs()[0];
+
+            String message = "";
+
+            Connection conn = newConnection();
+            Session session = conn.createSession();
+
+            Connection conn2;
+            Session session2;
+            Receiver responseReceiver;
+
+            if(isUseMultipleConnections())
+            {
+                conn2 = newConnection();
+                session2 = conn2.createSession();
+                responseReceiver = session2.createTemporaryQueueReceiver();
+            }
+            else
+            {
+                conn2 = null;
+                session2 = null;
+                responseReceiver = session.createTemporaryQueueReceiver();
+            }
+
+
+
+
+            responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
+
+
+
+            Sender s = session.createSender(queue, getWindowSize(), getMode());
+
+            Transaction txn = null;
+
+            if(useTran())
+            {
+                txn = session.createSessionLocalTransaction();
+            }
+
+            int received = 0;
+
+            if(getArgs().length >= 2)
+            {
+                message = getArgs()[1];
+                if(message.length() < getMessageSize())
+                {
+                    StringBuilder builder = new StringBuilder(getMessageSize());
+                    builder.append(message);
+                    for(int x = message.length(); x < getMessageSize(); x++)
+                    {
+                        builder.append('.');
+                    }
+                    message = builder.toString();
+                }
+
+                for(int i = 0; i < getCount(); i++)
+                {
+                    Properties properties = new Properties();
+                    properties.setMessageId(UnsignedLong.valueOf(i));
+                    properties.setReplyTo(responseReceiver.getAddress());
+
+                    AmqpValue amqpValue = new AmqpValue(message);
+                    Section[] sections = { new Header() , properties, amqpValue};
+                    final Message message1 = new Message(Arrays.asList(sections));
+
+                    s.send(message1, txn);
+
+                    Message responseMessage = responseReceiver.receive(false);
+                    if(responseMessage != null)
+                    {
+                        responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn);
+                        received++;
+                    }
+                }
+            }
+
+            if(txn != null)
+            {
+                txn.commit();
+            }
+
+
+            while(received < getCount())
+            {
+                Message responseMessage = responseReceiver.receive();
+                responseReceiver.acknowledge(responseMessage.getDeliveryTag());
+                received++;
+            }
+
+
+
+
+            s.close();
+            session.close();
+            conn.close();
+
+            if(session2 != null)
+            {
+                session2.close();
+                conn2.close();
+            }
+        }
+        catch (Connection.ConnectionException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+        catch (Sender.SenderClosingException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+        catch (Sender.SenderCreationException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+
+    }
+
+    protected boolean hasSingleLinkPerConnectionMode()
+    {
+        return true;
+    }
+
+    protected void printUsage(Options options)
+    {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(USAGE_STRING, options );
+    }
+
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,342 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.commons.cli.*;
+
+import java.util.*;
+
+public class Respond extends Util
+{
+    private static final String USAGE_STRING = "respond [options] <address>\n\nOptions:";
+    private Connection _conn;
+    private Session _session;
+    private Receiver _receiver;
+    private Transaction _txn;
+    private Map<String,Sender> _senders;
+    private UnsignedLong _responseMsgId = UnsignedLong.ZERO;
+    private Connection _conn2;
+    private Session _session2;
+
+    public Respond(final String[] args)
+    {
+        super(args);
+    }
+
+    @Override
+    protected boolean hasLinkDurableOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasLinkNameOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasResponseQueueOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasSizeOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasBlockOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasStdInOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasTxnOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasModeOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasCountOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasSingleLinkPerConnectionMode()
+    {
+        return true;
+    }
+
+
+    @Override
+    protected boolean hasWindowSizeOption()
+    {
+        return true;
+    }
+
+    public static void main(String[] args)
+    {
+        new Respond(args).run();
+    }
+
+    public void run()
+    {
+        try
+        {
+
+            _senders = new HashMap<String, Sender>();
+
+            final String queue = getArgs()[0];
+
+            String message = "";
+
+            _conn = newConnection();
+
+
+
+            if(isUseMultipleConnections())
+            {
+                _conn2 = newConnection();
+                _session2 = _conn2.createSession();
+            }
+
+
+            _session = _conn.createSession();
+
+
+            _receiver = _session.createReceiver(queue, getMode());
+            _txn = null;
+
+            int credit = 0;
+            int receivedCount  = 0;
+            _responseMsgId = UnsignedLong.ZERO;
+
+            Random random = null;
+            int batch = 0;
+            List<Message> txnMessages = null;
+            if(useTran())
+            {
+                if(getRollbackRatio() != 0)
+                {
+                    random = new Random();
+                }
+                batch = getBatchSize();
+                _txn = _session.createSessionLocalTransaction();
+                txnMessages = new ArrayList<Message>(batch);
+            }
+
+
+            for(int i = 0; receivedCount < getCount(); i++)
+            {
+
+                if(credit == 0)
+                {
+                    if(getCount() - i <= getWindowSize())
+                    {
+                        credit = getCount() - i;
+
+                    }
+                    else
+                    {
+                        credit = getWindowSize();
+
+                    }
+
+                    _receiver.setCredit(UnsignedInteger.valueOf(credit), false);
+
+                    if(!isBlock())
+                        _receiver.drain();
+                }
+
+                Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L);
+                credit--;
+                if(m==null)
+                {
+                    if(useTran() && batch != getBatchSize())
+                    {
+                        _txn.commit();
+                    }
+                    break;
+                }
+
+                System.out.println("Received Message: " + m.getPayload());
+
+                respond(m);
+
+
+
+                if(useTran())
+                {
+
+                    txnMessages.add(m);
+
+                    if(--batch == 0)
+                    {
+
+                        if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio())
+                        {
+                            _txn.commit();
+                            txnMessages.clear();
+                            receivedCount += getBatchSize();
+                        }
+                        else
+                        {
+                            System.out.println("Random Rollback");
+                            _txn.rollback();
+                            double result;
+                            do
+                            {
+                                _txn = _session.createSessionLocalTransaction();
+
+                                for(Message msg : txnMessages)
+                                {
+                                    respond(msg);
+                                }
+
+                                result = random.nextDouble();
+                                if(result<getRollbackRatio())
+                                {
+                                    _txn.rollback();
+                                }
+                                else
+                                {
+                                    _txn.commit();
+                                    txnMessages.clear();
+                                    receivedCount += getBatchSize();
+                                }
+                            }
+                            while(result < getRollbackRatio());
+                        }
+                        _txn = _session.createSessionLocalTransaction();
+
+                        batch = getBatchSize();
+                    }
+                }
+                else
+                {
+                    receivedCount++;
+                }
+
+            }
+
+
+            for(Sender s : _senders.values())
+            {
+                s.close();
+            }
+
+            _receiver.close();
+            _session.close();
+            _conn.close();
+            System.out.println("Received: " + receivedCount);
+        }
+        catch (Connection.ConnectionException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+        catch (Sender.SenderClosingException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+        catch (Sender.SenderCreationException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+    }
+
+    private void respond(Message m) throws Sender.SenderCreationException
+    {
+        List<Section> sections = m.getPayload();
+        String replyTo = null;
+        Object correlationId = null;
+        for(Section section : sections)
+        {
+            if(section instanceof Properties)
+            {
+                replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue();
+                correlationId = ((Properties) section).getMessageId();
+                break;
+            }
+        }
+
+        if(replyTo != null)
+        {
+            Sender s = _senders.get(replyTo);
+            if(s == null)
+            {
+                s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize());
+                _senders.put(replyTo, s);
+            }
+
+            List<Section> replySections = new ArrayList<Section>(sections);
+
+            ListIterator<Section> sectionIterator = replySections.listIterator();
+
+            while(sectionIterator.hasNext())
+            {
+                Section section = sectionIterator.next();
+                if(section instanceof Properties)
+                {
+                    Properties newProps = new Properties();
+                    newProps.setTo(replyTo);
+                    newProps.setCorrelationId(correlationId);
+                    newProps.setMessageId(_responseMsgId);
+                    _responseMsgId = _responseMsgId.add(UnsignedLong.ONE);
+                    sectionIterator.set(newProps);
+                }
+            }
+
+            Message replyMessage = new Message(replySections);
+            System.out.println("Sent Message: " + replySections);
+            s.send(replyMessage, _txn);
+
+        }
+        _receiver.acknowledge(m.getDeliveryTag(), _txn);
+    }
+
+    protected  void printUsage(Options options)
+    {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(USAGE_STRING, options );
+    }
+
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.util.Arrays;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.commons.cli.*;
+
+public class Send extends Util
+{
+    private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:";
+    private static final char[] HEX =  {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
+
+
+    public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, Connection.ConnectionException
+    {
+        new Send(args).run();
+    }
+
+
+    public Send(final String[] args)
+    {
+        super(args);
+    }
+
+    @Override
+    protected boolean hasLinkDurableOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasLinkNameOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasResponseQueueOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasSizeOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasBlockOption()
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean hasStdInOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasTxnOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasModeOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasCountOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasWindowSizeOption()
+    {
+        return true;
+    }
+
+    @Override
+    protected boolean hasSubjectOption()
+    {
+        return true;
+    }
+
+    public void run()
+    {
+
+        final String queue = getArgs()[0];
+
+        String message = "";
+
+        try
+        {
+            Connection conn = newConnection();
+
+            Session session = conn.createSession();
+
+
+            Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName());
+
+            Transaction txn = null;
+
+            if(useTran())
+            {
+                txn = session.createSessionLocalTransaction();
+            }
+
+            if(!useStdIn())
+            {
+                if(getArgs().length <= 2)
+                {
+                    if(getArgs().length == 2)
+                    {
+                        message = getArgs()[1];
+                    }
+                    for(int i = 0; i < getCount(); i++)
+                    {
+
+                        Properties properties = new Properties();
+                        properties.setMessageId(UnsignedLong.valueOf(i));
+                        if(getSubject() != null)
+                        {
+                            properties.setSubject(getSubject());
+                        }
+                        Section bodySection;
+                        byte[] bytes = (message + "  " + i).getBytes();
+                        if(bytes.length < getMessageSize())
+                        {
+                            byte[] origBytes = bytes;
+                            bytes = new byte[getMessageSize()];
+                            System.arraycopy(origBytes,0,bytes,0,origBytes.length);
+                            for(int x = origBytes.length; x < bytes.length; x++)
+                            {
+                                bytes[x] = (byte) '.';
+                            }
+                            bodySection = new Data(new Binary(bytes));
+                        }
+                        else
+                        {
+                            bodySection = new AmqpValue(message + " " + i);
+                        }
+
+                        Section[] sections = {properties, bodySection};
+                        final Message message1 = new Message(Arrays.asList(sections));
+
+                        s.send(message1, txn);
+                    }
+                }
+                else
+                {
+                    for(int i = 1; i < getArgs().length; i++)
+                    {
+                        s.send(new Message(getArgs()[i]), txn);
+                    }
+
+                }
+            }
+            else
+            {
+                LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in));
+
+
+                try
+                {
+                    while((message = buf.readLine()) != null)
+                    {
+                        s.send(new Message(message), txn);
+                    }
+                }
+                catch (IOException e)
+                {
+    // TODO
+                    e.printStackTrace();
+                }
+            }
+
+            if(txn != null)
+            {
+                txn.commit();
+            }
+
+            s.close();
+
+            session.close();
+            conn.close();
+        }
+        catch (Sender.SenderClosingException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+        catch (Connection.ConnectionException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+        catch (Sender.SenderCreationException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+
+
+    }
+
+    protected void printUsage(Options options)
+    {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(USAGE_STRING, options );
+    }
+
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org