You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/10 15:29:08 UTC

[plc4x] 01/01: Current Implementation.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch netty-serial-nio
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 98fdbb15661ae4c3e9601082ec941d9ded9ccdca
Author: julian <j....@pragmaticminds.de>
AuthorDate: Sat Aug 10 17:29:00 2019 +0200

    Current Implementation.
---
 .../plc4x/java/base/connection/SerialChannel.java  | 280 +++++++++++++++++++++
 .../java/base/connection/SerialChannelFactory.java |   9 +-
 .../base/connection/SerialPollingSelector.java     |  94 +++++++
 .../base/connection/SerialSelectorProvider.java    |  68 +++++
 .../java/base/connection/SerialSocketChannel.java  | 142 +++++++++++
 .../base/connection/SerialChannelFactoryTest.java  |  57 +++++
 6 files changed, 646 insertions(+), 4 deletions(-)

diff --git a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
new file mode 100644
index 0000000..4074058
--- /dev/null
+++ b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.base.connection;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.nio.AbstractNioByteChannel;
+import io.netty.channel.nio.AbstractNioChannel;
+import io.netty.channel.nio.NioEventLoop;
+import io.netty.channel.socket.DuplexChannel;
+import org.apache.commons.lang3.NotImplementedException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.SocketAddress;
+import java.nio.channels.*;
+import java.nio.channels.spi.AbstractSelectableChannel;
+import java.nio.channels.spi.AbstractSelector;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-10
+ */
+public class SerialChannel extends AbstractNioByteChannel implements DuplexChannel {
+
+
+    public SerialChannel() {
+        this(null, new SerialSocketChannel(new SerialSelectorProvider()));
+    }
+
+    /**
+     * Create a new instance
+     *
+     * @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
+     * @param ch     the underlying {@link SelectableChannel} on which it operates
+     */
+    protected SerialChannel(Channel parent, SelectableChannel ch) {
+        super(parent, ch);
+    }
+
+    @Override
+    public NioUnsafe unsafe() {
+        return new SerialNioUnsafe();
+    }
+
+    @Override
+    public boolean isInputShutdown() {
+        return false;
+    }
+
+    @Override
+    public ChannelFuture shutdownInput() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture shutdownInput(ChannelPromise promise) {
+        return null;
+    }
+
+    @Override
+    public boolean isOutputShutdown() {
+        return false;
+    }
+
+    @Override
+    public ChannelFuture shutdownOutput() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture shutdownOutput(ChannelPromise promise) {
+        return null;
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return false;
+    }
+
+    @Override
+    public ChannelFuture shutdown() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture shutdown(ChannelPromise promise) {
+        return null;
+    }
+
+    @Override
+    protected long doWriteFileRegion(FileRegion region) throws Exception {
+        return 0;
+    }
+
+    @Override
+    protected int doReadBytes(ByteBuf buf) throws Exception {
+        return 0;
+    }
+
+    @Override
+    protected int doWriteBytes(ByteBuf buf) throws Exception {
+        return 0;
+    }
+
+    @Override
+    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+        return false;
+    }
+
+    @Override
+    protected void doFinishConnect() throws Exception {
+
+    }
+
+    @Override
+    protected SocketAddress localAddress0() {
+        return null;
+    }
+
+    @Override
+    protected SocketAddress remoteAddress0() {
+        return null;
+    }
+
+    @Override
+    protected void doBind(SocketAddress localAddress) throws Exception {
+
+    }
+
+    @Override
+    protected void doDisconnect() throws Exception {
+
+    }
+
+    @Override
+    public ChannelConfig config() {
+        return null;
+    }
+
+    @Override
+    public boolean isActive() {
+        return false;
+    }
+
+    private static class SerialNioUnsafe implements NioUnsafe {
+        @Override
+        public SelectableChannel ch() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void finishConnect() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void read() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void forceFlush() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public RecvByteBufAllocator.Handle recvBufAllocHandle() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public SocketAddress localAddress() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public SocketAddress remoteAddress() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void register(EventLoop eventLoop, ChannelPromise promise) {
+            // Register
+            if (!(eventLoop instanceof NioEventLoop)) {
+                throw new IllegalArgumentException("Only valid for NioEventLoop!");
+            }
+            if (!(promise.channel() instanceof SerialChannel)) {
+                throw new IllegalArgumentException("Only valid for " + SerialChannel.class + " but is " + promise.channel().getClass());
+            }
+            // Register channel to event loop
+            // We have to use reflection here, I fear
+            try {
+                Method method = NioEventLoop.class.getDeclaredMethod("unwrappedSelector");
+                method.setAccessible(true);
+                SerialPollingSelector selector = (SerialPollingSelector) method.invoke(eventLoop);
+
+                selector.register((AbstractSelectableChannel) promise.channel(), 0, null);
+            } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+                e.printStackTrace();
+                throw new NotImplementedException("Should register channel to event loop!!!");
+            }
+        }
+
+        @Override
+        public void bind(SocketAddress localAddress, ChannelPromise promise) {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void disconnect(ChannelPromise promise) {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void close(ChannelPromise promise) {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void closeForcibly() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void deregister(ChannelPromise promise) {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void beginRead() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void write(Object msg, ChannelPromise promise) {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public void flush() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public ChannelPromise voidPromise() {
+            throw new NotImplementedException("");
+        }
+
+        @Override
+        public ChannelOutboundBuffer outboundBuffer() {
+            throw new NotImplementedException("");
+        }
+    }
+}
diff --git a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelFactory.java b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelFactory.java
index 6f93828..27b84c3 100644
--- a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelFactory.java
+++ b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelFactory.java
@@ -25,9 +25,12 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.jsc.JSerialCommChannel;
 import io.netty.channel.jsc.JSerialCommDeviceAddress;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.oio.OioEventLoopGroup;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 
+import java.util.concurrent.Executor;
+
 public class SerialChannelFactory implements ChannelFactory {
 
     private final String serialPort;
@@ -43,10 +46,8 @@ public class SerialChannelFactory implements ChannelFactory {
 
         try {
             Bootstrap bootstrap = new Bootstrap();
-            bootstrap.group(new OioEventLoopGroup());
-            bootstrap.channel(JSerialCommChannel.class);
-            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
-            bootstrap.option(ChannelOption.TCP_NODELAY, true);
+            bootstrap.group(new NioEventLoopGroup(0, (Executor)null, new SerialSelectorProvider()));
+            bootstrap.channel(SerialChannel.class);
             bootstrap.handler(channelHandler);
             // Start the client.
             ChannelFuture f = bootstrap.connect(address).sync();
diff --git a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
new file mode 100644
index 0000000..95c8993
--- /dev/null
+++ b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.base.connection;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.AbstractSelectableChannel;
+import java.nio.channels.spi.AbstractSelector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-10
+ */
+class SerialPollingSelector extends AbstractSelector {
+
+    private static final Logger logger = LoggerFactory.getLogger(SerialPollingSelector.class);
+
+    public SerialPollingSelector(SelectorProvider selectorProvider) {
+        super(selectorProvider);
+    }
+
+    @Override
+    public Set<SelectionKey> keys() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public Set<SelectionKey> selectedKeys() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public int selectNow() throws IOException {
+        throw new NotImplementedException("");
+    }
+
+    @Override
+    public int select(long timeout) throws IOException {
+        throw new NotImplementedException("");
+    }
+
+    @Override
+    public int select() throws IOException {
+        throw new NotImplementedException("");
+    }
+
+    @Override
+    public Selector wakeup() {
+        // TODO do we have to do something here?
+        return this;
+    }
+
+    @Override
+    protected void implCloseSelector() throws IOException {
+        // TODO should we do something here?
+    }
+
+    @Override
+    protected SelectionKey register(AbstractSelectableChannel ch, int ops, Object att) {
+        logger.debug("Registering Channel for selector {} with operations {}", ch, ops);
+        if (!(ch instanceof SerialSocketChannel)) {
+            throw new IllegalArgumentException("Given channel has to be of type " + SerialSocketChannel.class);
+        }
+        throw new NotImplementedException("");
+    }
+
+}
diff --git a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectorProvider.java b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectorProvider.java
new file mode 100644
index 0000000..f7f37a0
--- /dev/null
+++ b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectorProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.base.connection;
+
+import org.apache.commons.lang3.NotImplementedException;
+
+import java.io.IOException;
+import java.net.ProtocolFamily;
+import java.nio.channels.*;
+import java.nio.channels.spi.AbstractSelector;
+import java.nio.channels.spi.SelectorProvider;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-10
+ */
+class SerialSelectorProvider extends SelectorProvider {
+
+    @Override
+    public DatagramChannel openDatagramChannel() throws IOException {
+        throw new NotImplementedException("");
+    }
+
+    @Override
+    public DatagramChannel openDatagramChannel(ProtocolFamily family) throws IOException {
+        throw new NotImplementedException("");
+    }
+
+    @Override
+    public Pipe openPipe() throws IOException {
+        throw new NotImplementedException("");
+    }
+
+    @Override
+    public AbstractSelector openSelector() throws IOException {
+        return new SerialPollingSelector(this);
+    }
+
+    @Override
+    public ServerSocketChannel openServerSocketChannel() throws IOException {
+        throw new NotImplementedException("");
+    }
+
+    @Override
+    public SocketChannel openSocketChannel() throws IOException {
+        return new SerialSocketChannel(this);
+    }
+
+}
diff --git a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
new file mode 100644
index 0000000..e1b93bf
--- /dev/null
+++ b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.base.connection;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketOption;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.Set;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-10
+ */
+class SerialSocketChannel extends SocketChannel {
+
+    /**
+     * Initializes a new instance of this class.
+     *
+     * @param provider The provider that created this channel
+     */
+    protected SerialSocketChannel(SelectorProvider provider) {
+        super(provider);
+    }
+
+    @Override
+    public SocketChannel bind(SocketAddress local) throws IOException {
+        return null;
+    }
+
+    @Override
+    public <T> SocketChannel setOption(SocketOption<T> name, T value) throws IOException {
+        return null;
+    }
+
+    @Override
+    public SocketChannel shutdownInput() throws IOException {
+        return null;
+    }
+
+    @Override
+    public SocketChannel shutdownOutput() throws IOException {
+        return null;
+    }
+
+    @Override
+    public Socket socket() {
+        return null;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return false;
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return false;
+    }
+
+    @Override
+    public boolean connect(SocketAddress remote) throws IOException {
+        return false;
+    }
+
+    @Override
+    public boolean finishConnect() throws IOException {
+        return false;
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() throws IOException {
+        return null;
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() throws IOException {
+        return null;
+    }
+
+    @Override
+    public <T> T getOption(SocketOption<T> name) throws IOException {
+        return null;
+    }
+
+    @Override
+    public Set<SocketOption<?>> supportedOptions() {
+        return null;
+    }
+
+    @Override
+    protected void implCloseSelectableChannel() throws IOException {
+
+    }
+
+    @Override
+    protected void implConfigureBlocking(boolean block) throws IOException {
+
+    }
+}
diff --git a/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java b/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
new file mode 100644
index 0000000..b2aa31b
--- /dev/null
+++ b/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.base.connection;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-10
+ */
+public class SerialChannelFactoryTest {
+
+    @Test
+    public void createChannel() throws PlcConnectionException {
+        SerialChannelFactory asdf = new SerialChannelFactory("asdf");
+        asdf.createChannel(new ChannelHandler() {
+            @Override
+            public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+
+            }
+
+            @Override
+            public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+
+            }
+
+            @Override
+            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+
+            }
+        });
+    }
+}
\ No newline at end of file