You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2012/08/13 18:54:12 UTC
svn commit: r1372508 -
/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Fetch.java
Author: rajith
Date: Mon Aug 13 16:54:12 2012
New Revision: 1372508
URL: http://svn.apache.org/viewvc?rev=1372508&view=rev
Log:
NO-JIRA Adding the Fetch example. Looks like I missed this when adding
the other pieces.
Added:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Fetch.java
Added: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Fetch.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Fetch.java?rev=1372508&view=auto
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Fetch.java (added)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Fetch.java Mon Aug 13 16:54:12 2012
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.driver.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.util.UUID;
+
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslState;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.logging.LogHandler;
+
+public class Fetch
+{
+ enum State {NEW,AUTHENTICATING,CONNECTION_UP, FAILED};
+
+ private LogHandler _logger;
+ private Driver _driver;
+ private Connector<State> _ctor;
+ private Connection _conn;
+ private Session _ssn;
+ private Receiver _receiver;
+ private String _mailbox;
+ private int _count;
+
+ public Fetch(String mailbox, int count) throws Exception
+ {
+ _mailbox = mailbox;
+ _count = count;
+ _logger = new SystemOutLogger("TestServer: ");
+ _driver = new DriverImpl(_logger);
+
+ // setup a driver connection to the server
+ _ctor = _driver.createConnector("localhost", 5672, State.NEW);
+
+ // configure SASL
+ _ctor.sasl().setMechanisms(new String[]{"ANONYMOUS"});
+
+ // inform the engine about the connection, and link the driver to it.
+ _conn = new ConnectionImpl();
+ ((ConnectionImpl)_conn).setLocalContainerId("Post");
+ _ctor.setConnection(_conn);
+
+ // create a session, and Link for receiving from the mailbox
+ _logger.info("Fetching from mailbox " + mailbox);
+ _ssn = _conn.session();
+ _receiver = _ssn.receiver("receiver");
+ _receiver.setLocalTargetAddress(mailbox);
+
+ // now open all the engine endpoints
+ _conn.open();
+ _ssn.open();
+ _receiver.open();
+
+ // Allow the server to send "count" messages to the receiver by setting
+ // the credit to the expected count
+ _receiver.flow(_count);
+ }
+
+ private void receive() throws Exception
+ {
+ while (_receiver.getCredit() > 0)
+ {
+ if (_receiver.getQueued() == 0)
+ {
+ await();
+ }
+ else
+ {
+ Delivery d = _conn.getWorkHead();
+ byte[] readBuf = new byte[1024];
+ int bytesRead = _receiver.recv(readBuf, 0, readBuf.length);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ while (bytesRead > 0)
+ {
+ bout.write(readBuf, 0, bytesRead);
+ bytesRead = _receiver.recv(readBuf, 0, readBuf.length);
+ }
+ _logger.info("Received Msg : " + new String(bout.toByteArray()));
+ d.disposition(null);
+ d.settle();
+ _receiver.advance();
+ }
+ }
+ }
+
+ private void close() throws Exception
+ {
+ _conn.close();
+ while (_conn.getRemoteState() != EndpointState.CLOSED)
+ {
+ await();
+ }
+ _logger.info("Connection has been closed");
+ }
+
+ private void await() throws Exception
+ {
+ _logger.debug("Waiting for events...");
+
+ // prepare pending outbound data for the network
+ _ctor.process();
+
+ // wait forever for network event(s)
+ _driver.doWait(0);
+
+ // process any data that arrived
+ _ctor.process();
+
+ _logger.debug("...waiting done!");
+ }
+
+ private boolean authenticate() throws Exception
+ {
+ Sasl sasl = _ctor.sasl();
+ while (sasl.getState() != SaslState.PN_SASL_PASS && sasl.getState() != SaslState.PN_SASL_FAIL)
+ {
+ await();
+ }
+
+ return sasl.getState() == SaslState.PN_SASL_PASS;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ int count = 1;
+ if (args.length == 0)
+ {
+ System.out.println("You need to specify the mailbox and msg content.");
+ }
+ else if (args.length > 1)
+ {
+ count = Integer.parseInt(args[1]);
+ }
+
+ Fetch post = new Fetch(args[0],count);
+
+ if (post.authenticate())
+ {
+ System.out.println("Authentication sucessful");
+ }
+ else
+ {
+ System.out.println("Error: Authentication failure");
+ return;
+ }
+
+ post.receive();
+
+ post.close();
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org