You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/01/09 15:24:46 UTC
[10/34] qpid-proton git commit: PROTON-1385: remove proton-j from the
existing repo, it now has its own repo at:
https://git-wip-us.apache.org/repos/asf/qpid-proton-j.git
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
deleted file mode 100644
index 146ee09..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-/**
- * Interface used to identify classes that can be a child of a reactor.
- */
-public interface ReactorChild {
-
- /** Frees any resources associated with this child. */
- void free();
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
deleted file mode 100644
index e91a0ee..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-import java.nio.channels.SelectableChannel;
-
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Extendable;
-
-/**
- * An entity that can be multiplexed using a {@link Selector}.
- * <p>
- * Every selectable is associated with exactly one {@link SelectableChannel}.
- * Selectables may be interested in three kinds of events: read events, write
- * events, and timer events. A selectable will express its interest in these
- * events through the {@link #isReading()}, {@link #isWriting()}, and
- * {@link #getDeadline()} methods.
- * <p>
- * When a read, write, or timer event occurs, the selectable must be notified by
- * calling {@link #readable()}, {@link #writeable()}, or {@link #expired()} as
- * appropriate.
- *
- * Once a selectable reaches a terminal state (see {@link #isTerminal()}, it
- * will never be interested in events of any kind. When this occurs it should be
- * removed from the Selector and discarded using {@link #free()}.
- */
-public interface Selectable extends ReactorChild, Extendable {
-
- /**
- * A callback that can be passed to the various "on" methods of the
- * selectable - to allow code to be run when the selectable becomes ready
- * for the associated operation.
- */
- interface Callback {
- void run(Selectable selectable);
- }
-
- /**
- * @return <code>true</code> if the selectable is interested in receiving
- * notification (via the {@link #readable()} method that indicate
- * that the associated {@link SelectableChannel} has data ready
- * to be read from it.
- */
- boolean isReading();
-
- /**
- * @return <code>true</code> if the selectable is interested in receiving
- * notifications (via the {@link #writeable()} method that indicate
- * that the associated {@link SelectableChannel} is ready to be
- * written to.
- */
- boolean isWriting();
-
- /**
- * @return a deadline after which this selectable can expect to receive
- * a notification (via the {@link #expired()} method that indicates
- * that the deadline has past. The deadline is expressed in the
- * same format as {@link System#currentTimeMillis()}. Returning
- * a deadline of zero (or a negative number) indicates that the
- * selectable does not wish to be notified of expiry.
- */
- long getDeadline();
-
- /**
- * Sets the value that will be returned by {@link #isReading()}.
- * @param reading
- */
- void setReading(boolean reading);
-
- /**
- * Sets the value that will be returned by {@link #isWriting()}.
- * @param writing
- */
- void setWriting(boolean writing);
-
- /**
- * Sets the value that will be returned by {@link #getDeadline()}.
- * @param deadline
- */
- void setDeadline(long deadline);
-
- /**
- * Registers a callback that will be run when the selectable becomes ready
- * for reading.
- * @param runnable the callback to register. Any previously registered
- * callback will be replaced.
- */
- void onReadable(Callback runnable);
-
- /**
- * Registers a callback that will be run when the selectable becomes ready
- * for writing.
- * @param runnable the callback to register. Any previously registered
- * callback will be replaced.
- */
- void onWritable(Callback runnable);
-
- /**
- * Registers a callback that will be run when the selectable expires.
- * @param runnable the callback to register. Any previously registered
- * callback will be replaced.
- */
- void onExpired(Callback runnable);
-
- /**
- * Registers a callback that will be run when the selectable is notified of
- * an error.
- * @param runnable the callback to register. Any previously registered
- * callback will be replaced.
- */
- void onError(Callback runnable);
-
- /**
- * Registers a callback that will be run when the selectable is notified
- * that it has been released.
- * @param runnable the callback to register. Any previously registered
- * callback will be replaced.
- */
- void onRelease(Callback runnable);
-
- /**
- * Registers a callback that will be run when the selectable is notified
- * that it has been free'd.
- * @param runnable the callback to register. Any previously registered
- * callback will be replaced.
- */
- void onFree(Callback runnable);
-
- /**
- * Notify the selectable that the underlying {@link SelectableChannel} is
- * ready for a read operation.
- */
- void readable();
-
- /**
- * Notify the selectable that the underlying {@link SelectableChannel} is
- * ready for a write operation.
- */
- void writeable();
-
- /** Notify the selectable that it has expired. */
- void expired();
-
- /** Notify the selectable that an error has occurred. */
- void error();
-
- /** Notify the selectable that it has been released. */
- void release();
-
- /** Notify the selectable that it has been free'd. */
- @Override
- void free();
-
- /**
- * Associates a {@link SelectableChannel} with this selector.
- * @param channel
- */
- void setChannel(SelectableChannel channel); // This is the equivalent to pn_selectable_set_fd(...)
-
- /** @return the {@link SelectableChannel} associated with this selector. */
- SelectableChannel getChannel(); // This is the equivalent to pn_selectable_get_fd(...)
-
- /**
- * Check if a selectable is registered. This can be used for tracking
- * whether a given selectable has been registerd with an external event
- * loop.
- * <p>
- * <em>Note:</em> the reactor code, currently, does not use this flag.
- * @return <code>true</code>if the selectable is registered.
- */
- boolean isRegistered(); // XXX: unused in C reactor code
-
- /**
- * Set the registered flag for a selectable.
- * <p>
- * <em>Note:</em> the reactor code, currently, does not use this flag.
- * @param registered the value returned by {@link #isRegistered()}
- */
- void setRegistered(boolean registered); // XXX: unused in C reactor code
-
- /**
- * Configure a selectable with a set of callbacks that emit readable,
- * writable, and expired events into the supplied collector.
- * @param collector
- */
- void setCollector(final Collector collector);
-
- /** @return the reactor to which this selectable is a child. */
- Reactor getReactor() ;
-
- /**
- * Terminates the selectable. Once a selectable reaches a terminal state
- * it will never be interested in events of any kind.
- */
- public void terminate() ;
-
- /**
- * @return <code>true</code> if the selectable has reached a terminal state.
- */
- boolean isTerminal();
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
deleted file mode 100644
index 4228a8d..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * A multiplexor of instances of {@link Selectable}.
- * <p>
- * Many instances of <code>Selectable</code> can be added to a selector, and
- * the {@link #select(long)} method used to block the calling thread until
- * one of the <code>Selectables</code> becomes read to perform an operation.
- * <p>
- * This class is not thread safe, so only one thread should be manipulating the
- * contents of the selector, or running the {@link #select(long)} method at
- * any given time.
- */
-public interface Selector {
-
- /**
- * Adds a selectable to the selector.
- * @param selectable
- * @throws IOException
- */
- void add(Selectable selectable) throws IOException;
-
- /**
- * Updates the selector to reflect any changes interest by the specified
- * selectable. This is achieved by calling the
- * {@link Selectable#isReading()} and {@link Selectable#isWriting()}
- * methods.
- * @param selectable
- */
- void update(Selectable selectable);
-
- /**
- * Removes a selectable from the selector.
- * @param selectable
- */
- void remove(Selectable selectable);
-
- /**
- * Waits for the specified timeout period for one or more selectables to
- * become ready for an operation. Selectables that become ready are
- * returned by the {@link #readable()}, {@link #writeable()},
- * {@link #expired()}, or {@link #error()} methods.
- *
- * @param timeout the maximum number of milliseconds to block the calling
- * thread waiting for a selectable to become ready for an
- * operation. The value zero is interpreted as check but
- * don't block.
- * @throws IOException
- */
- void select(long timeout) throws IOException;
-
- /**
- * @return the selectables that have become readable since the last call
- * to {@link #select(long)}. Calling <code>select</code> clears
- * any previous values in this set before adding new values
- * corresponding to those selectables that have become readable.
- */
- Iterator<Selectable> readable();
-
- /**
- * @return the selectables that have become writable since the last call
- * to {@link #select(long)}. Calling <code>select</code> clears
- * any previous values in this set before adding new values
- * corresponding to those selectables that have become writable.
- */
- Iterator<Selectable> writeable();
-
- /**
- * @return the selectables that have expired since the last call
- * to {@link #select(long)}. Calling <code>select</code> clears
- * any previous values in this set before adding new values
- * corresponding to those selectables that have now expired.
- */
- Iterator<Selectable> expired();
-
- /**
- * @return the selectables that have encountered an error since the last
- * call to {@link #select(long)}. Calling <code>select</code>
- * clears any previous values in this set before adding new values
- * corresponding to those selectables that have encountered an
- * error.
- */
- Iterator<Selectable> error() ;
-
- /** Frees the resources used by this selector. */
- void free();
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
deleted file mode 100644
index 7fb5964..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-import org.apache.qpid.proton.engine.Event.Type;
-import org.apache.qpid.proton.engine.Extendable;
-import org.apache.qpid.proton.engine.Handler;
-
-/**
- * Represents work scheduled with a {@link Reactor} for execution at
- * some point in the future.
- * <p>
- * Tasks are created using the {@link Reactor#schedule(int, Handler)}
- * method.
- */
-public interface Task extends Extendable {
-
- /**
- * @return the deadline at which the handler associated with the scheduled
- * task should be delivered a {@link Type#TIMER_TASK} event.
- */
- long deadline();
-
- /** @return the reactor that created this task. */
- Reactor getReactor();
-
- /**
- * Cancel the execution of this task. No-op if invoked after the task was already executed.
- */
- void cancel();
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
deleted file mode 100644
index c5abbd8..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Handler;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.impl.RecordImpl;
-import org.apache.qpid.proton.reactor.Acceptor;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.impl.ReactorImpl;
-import org.apache.qpid.proton.reactor.Selectable;
-import org.apache.qpid.proton.reactor.Selectable.Callback;
-import org.apache.qpid.proton.messenger.impl.Address;
-
-@SuppressWarnings("deprecation")
-public class AcceptorImpl implements Acceptor {
-
- private Record attachments = new RecordImpl();
- private final SelectableImpl sel;
- protected static final String CONNECTION_ACCEPTOR_KEY = "pn_reactor_connection_acceptor";
-
- private class AcceptorReadable implements Callback {
- @Override
- public void run(Selectable selectable) {
- Reactor reactor = selectable.getReactor();
- try {
- SocketChannel socketChannel = ((ServerSocketChannel)selectable.getChannel()).accept();
- if (socketChannel == null) {
- throw new ReactorInternalException("Selectable readable, but no socket to accept");
- }
- Handler handler = BaseHandler.getHandler(AcceptorImpl.this);
- if (handler == null) {
- handler = reactor.getHandler();
- }
- Connection conn = reactor.connection(handler);
- Record conn_recs = conn.attachments();
- conn_recs.set(CONNECTION_ACCEPTOR_KEY, Acceptor.class, AcceptorImpl.this);
- InetSocketAddress peerAddr = (InetSocketAddress)socketChannel.getRemoteAddress();
- if (peerAddr != null) {
- Address addr = new Address();
- addr.setHost(peerAddr.getHostString());
- addr.setPort(Integer.toString(peerAddr.getPort()));
- conn_recs.set(ReactorImpl.CONNECTION_PEER_ADDRESS_KEY, Address.class, addr);
- }
- Transport trans = Proton.transport();
- Sasl sasl = trans.sasl();
- sasl.server();
- sasl.setMechanisms("ANONYMOUS");
- sasl.done(SaslOutcome.PN_SASL_OK);
- trans.bind(conn);
- IOHandler.selectableTransport(reactor, socketChannel.socket(), trans);
- } catch(IOException ioException) {
- sel.error();
- }
- }
- }
-
- private static class AcceptorFree implements Callback {
- @Override
- public void run(Selectable selectable) {
- try {
- if (selectable.getChannel() != null) {
- selectable.getChannel().close();
- }
- } catch(IOException ioException) {
- // Ignore - as we can't make the channel any more closed...
- }
- }
- }
-
- protected AcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException {
- ServerSocketChannel ssc = ((ReactorImpl)reactor).getIO().serverSocketChannel();
- ssc.bind(new InetSocketAddress(host, port));
- sel = ((ReactorImpl)reactor).selectable(this);
- sel.setChannel(ssc);
- sel.onReadable(new AcceptorReadable());
- sel.onFree(new AcceptorFree());
- sel.setReactor(reactor);
- BaseHandler.setHandler(this, handler);
- sel.setReading(true);
- reactor.update(sel);
- }
-
- @Override
- public void close() {
- if (!sel.isTerminal()) {
- Reactor reactor = sel.getReactor();
- try {
- sel.getChannel().close();
- } catch(IOException ioException) {
- // Ignore.
- }
- sel.setChannel(null);
- sel.terminate();
- reactor.update(sel);
- }
- }
-
- // Used for unit tests, where acceptor is bound to an ephemeral port
- public int getPortNumber() throws IOException {
- ServerSocketChannel ssc = (ServerSocketChannel)sel.getChannel();
- return ((InetSocketAddress)ssc.getLocalAddress()).getPort();
- }
-
- @Override
- public void free() {
- sel.free();
- }
-
- @Override
- public Record attachments() {
- return attachments;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java
deleted file mode 100644
index 1028ae8..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.nio.channels.Pipe;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-// Java equivalent to pn_io.
-// This is, currently, in the reactor.impl package because it is not
-// used elsewhere in the proton-j codebase. Instead it is present to
-// facilitate mocking of various Java I/O related resources so that
-// the unit tests can check for leaks.
-public interface IO {
-
- Pipe pipe() throws IOException;
-
- Selector selector() throws IOException;
-
- ServerSocketChannel serverSocketChannel() throws IOException;
-
- SocketChannel socketChannel() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
deleted file mode 100644
index 2dd7e1a..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.Channel;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.impl.TransportImpl;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Selectable;
-import org.apache.qpid.proton.reactor.Selectable.Callback;
-import org.apache.qpid.proton.reactor.Selector;
-import org.apache.qpid.proton.reactor.Acceptor;
-import org.apache.qpid.proton.reactor.impl.AcceptorImpl;
-import org.apache.qpid.proton.messenger.impl.Address;
-
-@SuppressWarnings("deprecation")
-public class IOHandler extends BaseHandler {
-
- // pni_handle_quiesced from connection.c
- private void handleQuiesced(Reactor reactor, Selector selector) throws IOException {
- // check if we are still quiesced, other handlers of
- // PN_REACTOR_QUIESCED could have produced more events to process
- if (!reactor.quiesced()) return;
- selector.select(reactor.getTimeout());
- reactor.mark();
- Iterator<Selectable> selectables = selector.readable();
- while(selectables.hasNext()) {
- selectables.next().readable();
- }
- selectables = selector.writeable();
- while(selectables.hasNext()) {
- selectables.next().writeable();
- }
- selectables = selector.expired();
- while(selectables.hasNext()) {
- selectables.next().expired();
- }
- selectables = selector.error();
- while(selectables.hasNext()) {
- selectables.next().error();
- }
- reactor.yield();
- }
-
- // pni_handle_open(...) from connection.c
- private void handleOpen(Reactor reactor, Event event) {
- Connection connection = event.getConnection();
- if (connection.getRemoteState() != EndpointState.UNINITIALIZED) {
- return;
- }
- // Outgoing Reactor connections set the virtual host automatically using the
- // following rules:
- String vhost = connection.getHostname();
- if (vhost == null) {
- // setHostname never called, use the host from the connection's
- // socket address as the default virtual host:
- String conAddr = reactor.getConnectionAddress(connection);
- if (conAddr != null) {
- Address addr = new Address(conAddr);
- connection.setHostname(addr.getHost());
- }
- } else if (vhost.isEmpty()) {
- // setHostname called explictly with a null string. This allows
- // the application to completely avoid sending a virtual host
- // name
- connection.setHostname(null);
- } else {
- // setHostname set by application - use it.
- }
- Transport transport = Proton.transport();
- Sasl sasl = transport.sasl();
- sasl.client();
- sasl.setMechanisms("ANONYMOUS");
- transport.bind(connection);
- }
-
- // pni_handle_bound(...) from connection.c
- // If this connection is an outgoing connection - not an incoming
- // connection created by the Acceptor - create a socket connection to
- // the peer address.
- private void handleBound(Reactor reactor, Event event) {
- Connection connection = event.getConnection();
- Record conn_recs = connection.attachments();
- if (conn_recs.get(AcceptorImpl.CONNECTION_ACCEPTOR_KEY, Acceptor.class) != null) {
- // Connection was created via the Acceptor, so the socket already
- // exists
- return;
- }
- String url = reactor.getConnectionAddress(connection);
- String hostname = connection.getHostname();
- int port = 5672;
-
- if (url != null) {
- Address address = new Address(url);
- hostname = address.getHost();
- try {
- port = Integer.parseInt(address.getImpliedPort());
- } catch(NumberFormatException nfe) {
- throw new IllegalArgumentException("Not a valid host: " + url, nfe);
- }
- } else if (hostname != null && !hostname.equals("")) {
- // Backward compatibility with old code that illegally overloaded
- // the connection's hostname
- int colonIndex = hostname.indexOf(':');
- if (colonIndex >= 0) {
- try {
- port = Integer.parseInt(hostname.substring(colonIndex+1));
- } catch(NumberFormatException nfe) {
- throw new IllegalArgumentException("Not a valid host: " + hostname, nfe);
- }
- hostname = hostname.substring(0, colonIndex);
- }
- } else {
- throw new IllegalStateException("No address provided for Connection");
- }
-
- Transport transport = event.getConnection().getTransport();
- Socket socket = null; // In this case, 'null' is the proton-j equivalent of PN_INVALID_SOCKET
- try {
- SocketChannel socketChannel = ((ReactorImpl)reactor).getIO().socketChannel();
- socketChannel.configureBlocking(false);
- socketChannel.connect(new InetSocketAddress(hostname, port));
- socket = socketChannel.socket();
- } catch(Exception exception) {
- ErrorCondition condition = new ErrorCondition();
- condition.setCondition(Symbol.getSymbol("proton:io"));
- condition.setDescription(exception.getMessage());
- transport.setCondition(condition);
- transport.close_tail();
- transport.close_head();
- transport.pop(Math.max(0, transport.pending())); // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
- }
- selectableTransport(reactor, socket, transport);
- }
-
- // pni_connection_capacity from connection.c
- private static int capacity(Selectable selectable) {
- Transport transport = ((SelectableImpl)selectable).getTransport();
- int capacity = transport.capacity();
- if (capacity < 0) {
- if (transport.isClosed()) {
- selectable.terminate();
- }
- }
- return capacity;
- }
-
- // pni_connection_pending from connection.c
- private static int pending(Selectable selectable) {
- Transport transport = ((SelectableImpl)selectable).getTransport();
- int pending = transport.pending();
- if (pending < 0) {
- if (transport.isClosed()) {
- selectable.terminate();
- }
- }
- return pending;
- }
-
- // pni_connection_deadline from connection.c
- private static long deadline(SelectableImpl selectable) {
- Reactor reactor = selectable.getReactor();
- Transport transport = selectable.getTransport();
- long deadline = transport.tick(reactor.now());
- return deadline;
- }
-
- // pni_connection_update from connection.c
- private static void update(Selectable selectable) {
- SelectableImpl selectableImpl = (SelectableImpl)selectable;
- int c = capacity(selectableImpl);
- int p = pending(selectableImpl);
- selectable.setReading(c > 0);
- selectable.setWriting(p > 0);
- selectable.setDeadline(deadline(selectableImpl));
- }
-
- // pni_connection_readable from connection.c
- private static Callback connectionReadable = new Callback() {
- @Override
- public void run(Selectable selectable) {
- Reactor reactor = selectable.getReactor();
- Transport transport = ((SelectableImpl)selectable).getTransport();
- int capacity = transport.capacity();
- if (capacity > 0) {
- SocketChannel socketChannel = (SocketChannel)selectable.getChannel();
- try {
- int n = socketChannel.read(transport.tail());
- if (n == -1) {
- transport.close_tail();
- } else {
- transport.process();
- }
- } catch (IOException e) {
- ErrorCondition condition = new ErrorCondition();
- condition.setCondition(Symbol.getSymbol("proton:io"));
- condition.setDescription(e.getMessage());
- transport.setCondition(condition);
- transport.close_tail();
- }
- }
- // (Comment from C code:) occasionally transport events aren't
- // generated when expected, so the following hack ensures we
- // always update the selector
- update(selectable);
- reactor.update(selectable);
- }
- };
-
- // pni_connection_writable from connection.c
- private static Callback connectionWritable = new Callback() {
- @Override
- public void run(Selectable selectable) {
- Reactor reactor = selectable.getReactor();
- Transport transport = ((SelectableImpl)selectable).getTransport();
- int pending = transport.pending();
- if (pending > 0) {
- SocketChannel channel = (SocketChannel)selectable.getChannel();
- try {
- int n = channel.write(transport.head());
- if (n < 0) {
- transport.close_head();
- } else {
- transport.pop(n);
- }
- } catch(IOException ioException) {
- ErrorCondition condition = new ErrorCondition();
- condition.setCondition(Symbol.getSymbol("proton:io"));
- condition.setDescription(ioException.getMessage());
- transport.setCondition(condition);
- transport.close_head();
- }
- }
-
- int newPending = transport.pending();
- if (newPending != pending) {
- update(selectable);
- reactor.update(selectable);
- }
- }
- };
-
- // pni_connection_error from connection.c
- private static Callback connectionError = new Callback() {
- @Override
- public void run(Selectable selectable) {
- Reactor reactor = selectable.getReactor();
- selectable.terminate();
- reactor.update(selectable);
- }
- };
-
- // pni_connection_expired from connection.c
- private static Callback connectionExpired = new Callback() {
- @Override
- public void run(Selectable selectable) {
- Reactor reactor = selectable.getReactor();
- Transport transport = ((SelectableImpl)selectable).getTransport();
- long deadline = transport.tick(reactor.now());
- selectable.setDeadline(deadline);
- int c = capacity(selectable);
- int p = pending(selectable);
- selectable.setReading(c > 0);
- selectable.setWriting(p > 0);
- reactor.update(selectable);
- }
- };
-
- private static Callback connectionFree = new Callback() {
- @Override
- public void run(Selectable selectable) {
- Channel channel = selectable.getChannel();
- if (channel != null) {
- try {
- channel.close();
- } catch(IOException ioException) {
- // Ignore
- }
- }
- }
- };
-
- // pn_reactor_selectable_transport
- // Note the socket argument can, validly be 'null' this is the equivalent of proton-c's PN_INVALID_SOCKET
- protected static Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) {
- Selectable selectable = reactor.selectable();
- selectable.setChannel(socket != null ? socket.getChannel() : null);
- selectable.onReadable(connectionReadable);
- selectable.onWritable(connectionWritable);
- selectable.onError(connectionError);
- selectable.onExpired(connectionExpired);
- selectable.onFree(connectionFree);
- ((SelectableImpl)selectable).setTransport(transport);
- ((TransportImpl)transport).setSelectable(selectable);
- ((TransportImpl)transport).setReactor(reactor);
- update(selectable);
- reactor.update(selectable);
- return selectable;
- }
-
- private void handleTransport(Reactor reactor, Event event) {
- TransportImpl transport = (TransportImpl)event.getTransport();
- Selectable selectable = transport.getSelectable();
- if (selectable != null && !selectable.isTerminal()) {
- update(selectable);
- reactor.update(selectable);
- }
- }
-
- @Override
- public void onUnhandled(Event event) {
- try {
- ReactorImpl reactor = (ReactorImpl)event.getReactor();
- Selector selector = reactor.getSelector();
- if (selector == null) {
- selector = new SelectorImpl(reactor.getIO());
- reactor.setSelector(selector);
- }
-
- Selectable selectable;
- switch(event.getType()) {
- case SELECTABLE_INIT:
- selectable = event.getSelectable();
- selector.add(selectable);
- break;
- case SELECTABLE_UPDATED:
- selectable = event.getSelectable();
- selector.update(selectable);
- break;
- case SELECTABLE_FINAL:
- selectable = event.getSelectable();
- selector.remove(selectable);
- selectable.release();
- break;
- case CONNECTION_LOCAL_OPEN:
- handleOpen(reactor, event);
- break;
- case CONNECTION_BOUND:
- handleBound(reactor, event);
- break;
- case TRANSPORT:
- handleTransport(reactor, event);
- break;
- case TRANSPORT_CLOSED:
- event.getTransport().unbind();
- break;
- case REACTOR_QUIESCED:
- handleQuiesced(reactor, selector);
- break;
- default:
- break;
- }
- } catch(IOException ioException) {
- // XXX: Might not be the right exception type, but at least the exception isn't being swallowed
- throw new ReactorInternalException(ioException);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java
deleted file mode 100644
index 6376b16..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.nio.channels.Pipe;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-public class IOImpl implements IO {
-
- @Override
- public Pipe pipe() throws IOException {
- return Pipe.open();
- }
-
- @Override
- public Selector selector() throws IOException {
- return Selector.open();
- }
-
- @Override
- public ServerSocketChannel serverSocketChannel() throws IOException {
- return ServerSocketChannel.open();
- }
-
- @Override
- public SocketChannel socketChannel() throws IOException {
- return SocketChannel.open();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
deleted file mode 100644
index 30c8df9..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.Pipe;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Event.Type;
-import org.apache.qpid.proton.engine.EventType;
-import org.apache.qpid.proton.engine.Extendable;
-import org.apache.qpid.proton.engine.ExtendableAccessor;
-import org.apache.qpid.proton.engine.Handler;
-import org.apache.qpid.proton.engine.HandlerException;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.impl.CollectorImpl;
-import org.apache.qpid.proton.engine.impl.ConnectionImpl;
-import org.apache.qpid.proton.engine.impl.RecordImpl;
-import org.apache.qpid.proton.reactor.Acceptor;
-import org.apache.qpid.proton.reactor.impl.AcceptorImpl;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.ReactorChild;
-import org.apache.qpid.proton.reactor.Selectable;
-import org.apache.qpid.proton.reactor.Selectable.Callback;
-import org.apache.qpid.proton.reactor.Selector;
-import org.apache.qpid.proton.reactor.Task;
-import org.apache.qpid.proton.messenger.impl.Address;
-
-@SuppressWarnings("deprecation")
-public class ReactorImpl implements Reactor, Extendable {
- public static final ExtendableAccessor<Event, Handler> ROOT = new ExtendableAccessor<>(Handler.class);
-
- private CollectorImpl collector;
- private long now;
- private long timeout;
- private Handler global;
- private Handler handler;
- private Set<ReactorChild> children;
- private int selectables;
- private boolean yield;
- private boolean stop;
- private Selectable selectable;
- private EventType previous;
- private Timer timer;
- private final Pipe wakeup;
- private Selector selector;
- private Record attachments;
- private final IO io;
- protected static final String CONNECTION_PEER_ADDRESS_KEY = "pn_reactor_connection_peer_address";
-
- @Override
- public long mark() {
- now = System.currentTimeMillis();
- return now;
- }
-
- @Override
- public long now() {
- return now;
- }
-
- protected ReactorImpl(IO io) throws IOException {
- collector = (CollectorImpl)Proton.collector();
- global = new IOHandler();
- handler = new BaseHandler();
- children = new HashSet<ReactorChild>();
- selectables = 0;
- timer = new Timer(collector);
- this.io = io;
- wakeup = this.io.pipe();
- mark();
- attachments = new RecordImpl();
- }
-
- public ReactorImpl() throws IOException {
- this(new IOImpl());
- }
-
- @Override
- public void free() {
- if (wakeup.source().isOpen()) {
- try {
- wakeup.source().close();
- } catch(IOException e) {
- // Ignore.
- }
- }
- if (wakeup.sink().isOpen()) {
- try {
- wakeup.sink().close();
- } catch(IOException e) {
- // Ignore
- }
- }
-
- if (selector != null) {
- selector.free();
- }
-
- for (ReactorChild child : children) {
- child.free();
- }
- }
-
- @Override
- public Record attachments() {
- return attachments;
- }
-
- @Override
- public long getTimeout() {
- return timeout;
- }
-
- @Override
- public void setTimeout(long timeout) {
- this.timeout = timeout;
- }
-
- @Override
- public Handler getGlobalHandler() {
- return global;
- }
-
- @Override
- public void setGlobalHandler(Handler handler) {
- global = handler;
- }
-
- @Override
- public Handler getHandler() {
- return handler;
- }
-
- @Override
- public void setHandler(Handler handler) {
- this.handler = handler;
- }
-
- @Override
- public Set<ReactorChild> children() {
- return children;
- }
-
- @Override
- public Collector collector() {
- return collector;
- }
-
- private class ReleaseCallback implements Callback {
- private final ReactorImpl reactor;
- private final ReactorChild child;
- public ReleaseCallback(ReactorImpl reactor, ReactorChild child) {
- this.reactor = reactor;
- this.child = child;
- }
- @Override
- public void run(Selectable selectable) {
- if (reactor.children.remove(child)) {
- --reactor.selectables;
- child.free();
- }
- }
- }
-
- @Override
- public Selectable selectable() {
- return selectable(null);
- }
-
- public SelectableImpl selectable(ReactorChild child) {
- SelectableImpl result = new SelectableImpl();
- result.setCollector(collector);
- collector.put(Type.SELECTABLE_INIT, result);
- result.setReactor(this);
- children.add(child == null ? result : child);
- result.onRelease(new ReleaseCallback(this, child == null ? result : child));
- ++selectables;
- return result;
- }
-
- @Override
- public void update(Selectable selectable) {
- SelectableImpl selectableImpl = (SelectableImpl)selectable;
- if (!selectableImpl.isTerminated()) {
- if (selectableImpl.isTerminal()) {
- selectableImpl.terminated();
- collector.put(Type.SELECTABLE_FINAL, selectable);
- } else {
- collector.put(Type.SELECTABLE_UPDATED, selectable);
- }
- }
- }
-
- // pn_event_handler
- private Handler eventHandler(Event event) {
- Handler result;
- if (event.getLink() != null) {
- result = BaseHandler.getHandler(event.getLink());
- if (result != null) return result;
- }
- if (event.getSession() != null) {
- result = BaseHandler.getHandler(event.getSession());
- if (result != null) return result;
- }
- if (event.getConnection() != null) {
- result = BaseHandler.getHandler(event.getConnection());
- if (result != null) return result;
- }
-
- if (event.getTask() != null) {
- result = BaseHandler.getHandler(event.getTask());
- if (result != null) return result;
- }
-
- if (event.getSelectable() != null) {
- result = BaseHandler.getHandler(event.getSelectable());
- if (result != null) return result;
- }
-
- return handler;
- }
-
-
- @Override
- public void yield() {
- yield = true;
- }
-
- @Override
- public boolean quiesced() {
- Event event = collector.peek();
- if (event == null) return true;
- if (collector.more()) return false;
- return event.getEventType() == Type.REACTOR_QUIESCED;
- }
-
- @Override
- public boolean process() throws HandlerException {
- mark();
- EventType previous = null;
- while (true) {
- Event event = collector.peek();
- if (event != null) {
- if (yield) {
- yield = false;
- return true;
- }
- Handler handler = eventHandler(event);
- dispatch(event, handler);
- dispatch(event, global);
-
- if (event.getEventType() == Type.CONNECTION_FINAL) {
- children.remove(event.getConnection());
- }
- this.previous = event.getEventType();
- previous = this.previous;
- collector.pop();
-
- } else {
- if (!stop && more()) {
- if (previous != Type.REACTOR_QUIESCED && this.previous != Type.REACTOR_FINAL) {
- collector.put(Type.REACTOR_QUIESCED, this);
- } else {
- return true;
- }
- } else {
- if (selectable != null) {
- selectable.terminate();
- update(selectable);
- selectable = null;
- } else {
- collector.put(Type.REACTOR_FINAL, this);
- return false;
- }
- }
- }
- }
- }
-
- private void dispatch(Event event, Handler handler) {
- ROOT.set(event, handler);
- event.dispatch(handler);
- }
-
- @Override
- public void wakeup() {
- try {
- wakeup.sink().write(ByteBuffer.allocate(1));
- } catch(ClosedChannelException channelClosedException) {
- // Ignore - pipe already closed by reactor being shutdown.
- } catch(IOException ioException) {
- throw new ReactorInternalException(ioException);
- }
- }
-
- @Override
- public void start() {
- collector.put(Type.REACTOR_INIT, this);
- selectable = timerSelectable();
- }
-
- @Override
- public void stop() throws HandlerException {
- stop = true;
- }
-
- private boolean more() {
- return timer.tasks() > 0 || selectables > 1;
- }
-
- @Override
- public void run() throws HandlerException {
- setTimeout(3141);
- start();
- while(process()) {}
- stop();
- process();
- collector = null;
- }
-
- // pn_reactor_schedule from reactor.c
- @Override
- public Task schedule(int delay, Handler handler) {
- Task task = timer.schedule(now + delay);
- ((TaskImpl)task).setReactor(this);
- BaseHandler.setHandler(task, handler);
- if (selectable != null) {
- selectable.setDeadline(timer.deadline());
- update(selectable);
- }
- return task;
- }
-
- private void expireSelectable(Selectable selectable) {
- ReactorImpl reactor = (ReactorImpl) selectable.getReactor();
- reactor.timer.tick(reactor.now);
- selectable.setDeadline(reactor.timer.deadline());
- reactor.update(selectable);
- }
-
- private class TimerReadable implements Callback {
-
- @Override
- public void run(Selectable selectable) {
- try {
- wakeup.source().read(ByteBuffer.allocate(64));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- expireSelectable(selectable);
- }
-
- }
-
- private class TimerExpired implements Callback {
- @Override
- public void run(Selectable selectable) {
- expireSelectable(selectable);
- }
- }
-
-
- // pni_timer_finalize from reactor.c
- private static class TimerFree implements Callback {
- @Override
- public void run(Selectable selectable) {
- try {
- selectable.getChannel().close();
- } catch(IOException ioException) {
- // Ignore
- }
- }
- }
-
- private Selectable timerSelectable() {
- Selectable sel = selectable();
- sel.setChannel(wakeup.source());
- sel.onReadable(new TimerReadable());
- sel.onExpired(new TimerExpired());
- sel.onFree(new TimerFree());
- sel.setReading(true);
- sel.setDeadline(timer.deadline());
- update(sel);
- return sel;
- }
-
- protected Selector getSelector() {
- return selector;
- }
-
- protected void setSelector(Selector selector) {
- this.selector = selector;
- }
-
- // pn_reactor_connection from connection.c
- @Override
- public Connection connection(Handler handler) {
- Connection connection = Proton.connection();
- BaseHandler.setHandler(connection, handler);
- connection.collect(collector);
- children.add(connection);
- ((ConnectionImpl)connection).setReactor(this);
- return connection;
- }
-
- @Override
- public Connection connectionToHost(String host, int port, Handler handler) {
- Connection connection = connection(handler);
- setConnectionHost(connection, host, port);
- return connection;
- }
-
- @Override
- public String getConnectionAddress(Connection connection) {
- Record r = connection.attachments();
- Address addr = r.get(CONNECTION_PEER_ADDRESS_KEY, Address.class);
- if (addr != null) {
- StringBuilder sb = new StringBuilder(addr.getHost());
- if (addr.getPort() != null)
- sb.append(":" + addr.getPort());
- return sb.toString();
- }
- return null;
- }
-
- @Override
- public void setConnectionHost(Connection connection,
- String host, int port) {
- Record r = connection.attachments();
- // cannot set the address on an incoming connection
- if (r.get(AcceptorImpl.CONNECTION_ACCEPTOR_KEY, Acceptor.class) == null) {
- Address addr = new Address();
- addr.setHost(host);
- if (port == 0) {
- port = 5672;
- }
- addr.setPort(Integer.toString(port));
- r.set(CONNECTION_PEER_ADDRESS_KEY, Address.class, addr);
- } else {
- throw new IllegalStateException("Cannot set the host address on an incoming Connection");
- }
- }
-
- @Override
- public Acceptor acceptor(String host, int port) throws IOException {
- return this.acceptor(host, port, null);
- }
-
- @Override
- public Acceptor acceptor(String host, int port, Handler handler) throws IOException {
- return new AcceptorImpl(this, host, port, handler);
- }
-
- public IO getIO() {
- return io;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java
deleted file mode 100644
index 6dde424..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-/**
- * Thrown by the reactor when it encounters an internal error condition.
- * This is analogous to an assertion failure in the proton-c reactor
- * implementation.
- */
-class ReactorInternalException extends RuntimeException {
-
- private static final long serialVersionUID = 8979674526584642454L;
-
- protected ReactorInternalException(String msg) {
- super(msg);
- }
-
- protected ReactorInternalException(Throwable cause) {
- super(cause);
- }
-
- protected ReactorInternalException(String msg, Throwable cause) {
- super(msg, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
deleted file mode 100644
index df4e6cc..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.nio.channels.SelectableChannel;
-
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Event.Type;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.impl.CollectorImpl;
-import org.apache.qpid.proton.engine.impl.RecordImpl;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Selectable;
-
-public class SelectableImpl implements Selectable {
-
- private Callback readable;
- private Callback writable;
- private Callback error;
- private Callback expire;
- private Callback release;
- private Callback free;
-
- private boolean reading = false;
- private boolean writing = false;
- private long deadline = 0;
- private SelectableChannel channel;
- private Record attachments = new RecordImpl();
- private boolean registered;
- private Reactor reactor;
- private Transport transport;
- private boolean terminal;
- private boolean terminated;
-
- @Override
- public boolean isReading() {
- return reading;
- }
-
- @Override
- public boolean isWriting() {
- return writing;
- }
-
- @Override
- public long getDeadline() {
- return deadline;
- }
-
- @Override
- public void setReading(boolean reading) {
- this.reading = reading;
- }
-
- @Override
- public void setWriting(boolean writing) {
- this.writing = writing;
- }
-
- @Override
- public void setDeadline(long deadline) {
- this.deadline = deadline;
- }
-
- @Override
- public void onReadable(Callback runnable) {
- this.readable = runnable;
- }
-
- @Override
- public void onWritable(Callback runnable) {
- this.writable = runnable;
- }
-
- @Override
- public void onExpired(Callback runnable) {
- this.expire = runnable;
- }
-
- @Override
- public void onError(Callback runnable) {
- this.error = runnable;
- }
-
- @Override
- public void onRelease(Callback runnable) {
- this.release = runnable;
- }
-
- @Override
- public void onFree(Callback runnable) {
- this.free = runnable;
- }
-
- @Override
- public void readable() {
- if (readable != null) {
- readable.run(this);
- }
- }
-
- @Override
- public void writeable() {
- if (writable != null) {
- writable.run(this);
- }
- }
-
- @Override
- public void expired() {
- if (expire != null) {
- expire.run(this);
- }
- }
-
- @Override
- public void error() {
- if (error != null) {
- error.run(this);
- }
- }
-
- @Override
- public void release() {
- if (release != null) {
- release.run(this);
- }
- }
-
- @Override
- public void free() {
- if (free != null) {
- free.run(this);
- }
- }
-
- @Override
- public void setChannel(SelectableChannel channel) {
- this.channel = channel;
- }
-
- @Override
- public SelectableChannel getChannel() {
- return channel;
- }
-
- @Override
- public boolean isRegistered() {
- return registered;
- }
-
- @Override
- public void setRegistered(boolean registered) {
- this.registered = registered;
- }
-
- @Override
- public void setCollector(final Collector collector) {
- final CollectorImpl collectorImpl = (CollectorImpl)collector;
-
- onReadable(new Callback() {
- @Override
- public void run(Selectable selectable) {
- collectorImpl.put(Type.SELECTABLE_READABLE, selectable);
- }
- });
- onWritable(new Callback() {
- @Override
- public void run(Selectable selectable) {
- collectorImpl.put(Type.SELECTABLE_WRITABLE, selectable);
- }
- });
- onExpired(new Callback() {
- @Override
- public void run(Selectable selectable) {
- collectorImpl.put(Type.SELECTABLE_EXPIRED, selectable);
- }
- });
- onError(new Callback() {
- @Override
- public void run(Selectable selectable) {
- collectorImpl.put(Type.SELECTABLE_ERROR, selectable);
- }
- });
- }
-
- @Override
- public Reactor getReactor() {
- return reactor;
- }
-
- @Override
- public void terminate() {
- terminal = true;
- }
-
- @Override
- public boolean isTerminal() {
- return terminal;
- }
-
- protected Transport getTransport() {
- return transport;
- }
-
- protected void setTransport(Transport transport) {
- this.transport = transport;
- }
-
- protected void setReactor(Reactor reactor) {
- this.reactor = reactor;
- }
-
- @Override
- public Record attachments() {
- return attachments;
- }
-
- public boolean isTerminated() {
- return terminated;
- }
-
- public void terminated() {
- terminated = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
deleted file mode 100644
index b4efb39..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.HashSet;
-import java.util.Iterator;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.reactor.Selectable;
-import org.apache.qpid.proton.reactor.Selector;
-
-class SelectorImpl implements Selector {
-
- private final java.nio.channels.Selector selector;
- private final HashSet<Selectable> selectables = new HashSet<Selectable>();
- private final HashSet<Selectable> readable = new HashSet<Selectable>();
- private final HashSet<Selectable> writeable = new HashSet<Selectable>();
- private final HashSet<Selectable> expired = new HashSet<Selectable>();
- private final HashSet<Selectable> error = new HashSet<Selectable>();
-
- protected SelectorImpl(IO io) throws IOException {
- selector = io.selector();
- }
-
- @Override
- public void add(Selectable selectable) throws IOException {
- // Selectable can be 'null' - if this is the case it can only ever receive expiry events.
- if (selectable.getChannel() != null) {
- selectable.getChannel().configureBlocking(false);
- SelectionKey key = selectable.getChannel().register(selector, 0);
- key.attach(selectable);
- }
- selectables.add(selectable);
- update(selectable);
- }
-
- @Override
- public void update(Selectable selectable) {
- if (selectable.getChannel() != null) {
- int interestedOps = 0;
- if (selectable.getChannel() instanceof SocketChannel &&
- ((SocketChannel)selectable.getChannel()).isConnectionPending()) {
- interestedOps |= SelectionKey.OP_CONNECT;
- } else {
- if (selectable.isReading()) {
- if (selectable.getChannel() instanceof ServerSocketChannel) {
- interestedOps |= SelectionKey.OP_ACCEPT;
- } else {
- interestedOps |= SelectionKey.OP_READ;
- }
- }
- if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
- }
- SelectionKey key = selectable.getChannel().keyFor(selector);
- key.interestOps(interestedOps);
- }
- }
-
- @Override
- public void remove(Selectable selectable) {
- if (selectable.getChannel() != null) {
- SelectionKey key = selectable.getChannel().keyFor(selector);
- if (key != null) {
- key.cancel();
- key.attach(null);
- }
- }
- selectables.remove(selectable);
- }
-
- @Override
- public void select(long timeout) throws IOException {
-
- long now = System.currentTimeMillis();
- if (timeout > 0) {
- long deadline = 0;
- // XXX: Note: this differs from the C code which requires a call to update() to make deadline changes take affect
- for (Selectable selectable : selectables) {
- long d = selectable.getDeadline();
- if (d > 0) {
- deadline = (deadline == 0) ? d : Math.min(deadline, d);
- }
- }
-
- if (deadline > 0) {
- long delta = deadline - now;
- if (delta < 0) {
- timeout = 0;
- } else if (delta < timeout) {
- timeout = delta;
- }
- }
- }
-
- error.clear();
-
- long awoken = 0;
- if (timeout > 0) {
- long remainingTimeout = timeout;
- while(remainingTimeout > 0) {
- selector.select(remainingTimeout);
- awoken = System.currentTimeMillis();
-
- for (Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); iterator.hasNext();) {
- SelectionKey key = iterator.next();
- if (key.isConnectable()) {
- try {
- ((SocketChannel)key.channel()).finishConnect();
- update((Selectable)key.attachment());
- } catch(IOException ioException) {
- SelectableImpl selectable = (SelectableImpl)key.attachment();
- ErrorCondition condition = new ErrorCondition();
- condition.setCondition(Symbol.getSymbol("proton:io"));
- condition.setDescription(ioException.getMessage());
- Transport transport = selectable.getTransport();
- if (transport != null) {
- transport.setCondition(condition);
- transport.close_tail();
- transport.close_head();
- transport.pop(Math.max(0, transport.pending())); // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
- }
- error.add(selectable);
- }
- iterator.remove();
- }
- }
- if (!selector.selectedKeys().isEmpty()) {
- break;
- }
- remainingTimeout = remainingTimeout - (awoken - now);
- }
- } else {
- selector.selectNow();
- awoken = System.currentTimeMillis();
- }
-
- readable.clear();
- writeable.clear();
- expired.clear();
- for (SelectionKey key : selector.selectedKeys()) {
- Selectable selectable = (Selectable)key.attachment();
- if (key.isReadable()) readable.add(selectable);
- if (key.isAcceptable()) readable.add(selectable);
- if (key.isWritable()) writeable.add(selectable);
- }
- selector.selectedKeys().clear();
- // XXX: Note: this is different to the C code which evaluates expiry at the point the selectable is iterated over.
- for (Selectable selectable : selectables) {
- long deadline = selectable.getDeadline();
- if (deadline > 0 && awoken >= deadline) {
- expired.add(selectable);
- }
- }
- }
-
- @Override
- public Iterator<Selectable> readable() {
- return readable.iterator();
- }
-
- @Override
- public Iterator<Selectable> writeable() {
- return writeable.iterator();
- }
-
- @Override
- public Iterator<Selectable> expired() {
- return expired.iterator();
- }
-
- @Override
- public Iterator<Selectable> error() {
- return error.iterator();
- }
-
- @Override
- public void free() {
- try {
- selector.close();
- } catch(IOException ioException) {
- // Ignore
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
deleted file mode 100644
index 11bb6b8..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.impl.RecordImpl;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Task;
-
-public class TaskImpl implements Task, Comparable<TaskImpl> {
- private final long deadline;
- private final int counter;
- private boolean cancelled = false;
- private final AtomicInteger count = new AtomicInteger();
- private Record attachments = new RecordImpl();
- private Reactor reactor;
-
- public TaskImpl(long deadline) {
- this.deadline = deadline;
- this.counter = count.getAndIncrement();
- }
-
- @Override
- public int compareTo(TaskImpl other) {
- int result;
- if (deadline < other.deadline) {
- result = -1;
- } else if (deadline > other.deadline) {
- result = 1;
- } else {
- result = counter - other.counter;
- }
- return result;
- }
-
- @Override
- public long deadline() {
- return deadline;
- }
-
- public boolean isCancelled() {
- return cancelled;
- }
-
- @Override
- public void cancel() {
- cancelled = true;
- }
-
- public void setReactor(Reactor reactor) {
- this.reactor = reactor;
- }
-
- @Override
- public Reactor getReactor() {
- return reactor;
- }
-
- @Override
- public Record attachments() {
- return attachments;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
deleted file mode 100644
index b8df19d..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.util.PriorityQueue;
-
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Event.Type;
-import org.apache.qpid.proton.engine.impl.CollectorImpl;
-import org.apache.qpid.proton.reactor.Task;
-
-public class Timer {
-
- private CollectorImpl collector;
- private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
-
- public Timer(Collector collector) {
- this.collector = (CollectorImpl)collector;
- }
-
- Task schedule(long deadline) {
- TaskImpl task = new TaskImpl(deadline);
- tasks.add(task);
- return task;
- }
-
- long deadline() {
- flushCancelled();
- if (tasks.size() > 0) {
- Task task = tasks.peek();
- return task.deadline();
- } else {
- return 0;
- }
- }
-
- private void flushCancelled() {
- while (!tasks.isEmpty()) {
- TaskImpl task = tasks.peek();
- if (task.isCancelled())
- tasks.poll();
- else
- break;
- }
- }
-
- void tick(long now) {
- while(!tasks.isEmpty()) {
- TaskImpl task = tasks.peek();
- if (now >= task.deadline()) {
- tasks.poll();
- if (!task.isCancelled())
- collector.put(Type.TIMER_TASK, task);
- } else {
- break;
- }
- }
- }
-
- int tasks() {
- flushCancelled();
- return tasks.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory
deleted file mode 100644
index 46a716b..0000000
--- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.qpid.proton.codec.impl.DataFactoryImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory
deleted file mode 100644
index 00e7a60..0000000
--- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.qpid.proton.driver.impl.DriverFactoryImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory
deleted file mode 100644
index 33f9865..0000000
--- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.qpid.proton.engine.impl.EngineFactoryImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory
deleted file mode 100644
index 99eb726..0000000
--- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.qpid.proton.message.impl.MessageFactoryImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory
deleted file mode 100644
index d0beeb4..0000000
--- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.qpid.proton.messenger.impl.MessengerFactoryImpl
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org