You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/10 15:29:08 UTC
[plc4x] 01/01: Current Implementation.
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch netty-serial-nio
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 98fdbb15661ae4c3e9601082ec941d9ded9ccdca
Author: julian <j....@pragmaticminds.de>
AuthorDate: Sat Aug 10 17:29:00 2019 +0200
Current Implementation.
---
.../plc4x/java/base/connection/SerialChannel.java | 280 +++++++++++++++++++++
.../java/base/connection/SerialChannelFactory.java | 9 +-
.../base/connection/SerialPollingSelector.java | 94 +++++++
.../base/connection/SerialSelectorProvider.java | 68 +++++
.../java/base/connection/SerialSocketChannel.java | 142 +++++++++++
.../base/connection/SerialChannelFactoryTest.java | 57 +++++
6 files changed, 646 insertions(+), 4 deletions(-)
diff --git a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
new file mode 100644
index 0000000..4074058
--- /dev/null
+++ b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannel.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.base.connection;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.nio.AbstractNioByteChannel;
+import io.netty.channel.nio.AbstractNioChannel;
+import io.netty.channel.nio.NioEventLoop;
+import io.netty.channel.socket.DuplexChannel;
+import org.apache.commons.lang3.NotImplementedException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.SocketAddress;
+import java.nio.channels.*;
+import java.nio.channels.spi.AbstractSelectableChannel;
+import java.nio.channels.spi.AbstractSelector;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-10
+ */
+public class SerialChannel extends AbstractNioByteChannel implements DuplexChannel {
+
+
+ public SerialChannel() {
+ this(null, new SerialSocketChannel(new SerialSelectorProvider()));
+ }
+
+ /**
+ * Create a new instance
+ *
+ * @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
+ * @param ch the underlying {@link SelectableChannel} on which it operates
+ */
+ protected SerialChannel(Channel parent, SelectableChannel ch) {
+ super(parent, ch);
+ }
+
+ @Override
+ public NioUnsafe unsafe() {
+ return new SerialNioUnsafe();
+ }
+
+ @Override
+ public boolean isInputShutdown() {
+ return false;
+ }
+
+ @Override
+ public ChannelFuture shutdownInput() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture shutdownInput(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public boolean isOutputShutdown() {
+ return false;
+ }
+
+ @Override
+ public ChannelFuture shutdownOutput() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture shutdownOutput(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public ChannelFuture shutdown() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture shutdown(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ protected long doWriteFileRegion(FileRegion region) throws Exception {
+ return 0;
+ }
+
+ @Override
+ protected int doReadBytes(ByteBuf buf) throws Exception {
+ return 0;
+ }
+
+ @Override
+ protected int doWriteBytes(ByteBuf buf) throws Exception {
+ return 0;
+ }
+
+ @Override
+ protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
+ return false;
+ }
+
+ @Override
+ protected void doFinishConnect() throws Exception {
+
+ }
+
+ @Override
+ protected SocketAddress localAddress0() {
+ return null;
+ }
+
+ @Override
+ protected SocketAddress remoteAddress0() {
+ return null;
+ }
+
+ @Override
+ protected void doBind(SocketAddress localAddress) throws Exception {
+
+ }
+
+ @Override
+ protected void doDisconnect() throws Exception {
+
+ }
+
+ @Override
+ public ChannelConfig config() {
+ return null;
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ private static class SerialNioUnsafe implements NioUnsafe {
+ @Override
+ public SelectableChannel ch() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void finishConnect() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void read() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void forceFlush() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public RecvByteBufAllocator.Handle recvBufAllocHandle() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public SocketAddress localAddress() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public SocketAddress remoteAddress() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void register(EventLoop eventLoop, ChannelPromise promise) {
+ // Register
+ if (!(eventLoop instanceof NioEventLoop)) {
+ throw new IllegalArgumentException("Only valid for NioEventLoop!");
+ }
+ if (!(promise.channel() instanceof SerialChannel)) {
+ throw new IllegalArgumentException("Only valid for " + SerialChannel.class + " but is " + promise.channel().getClass());
+ }
+ // Register channel to event loop
+ // We have to use reflection here, I fear
+ try {
+ Method method = NioEventLoop.class.getDeclaredMethod("unwrappedSelector");
+ method.setAccessible(true);
+ SerialPollingSelector selector = (SerialPollingSelector) method.invoke(eventLoop);
+
+ selector.register((AbstractSelectableChannel) promise.channel(), 0, null);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ e.printStackTrace();
+ throw new NotImplementedException("Should register channel to event loop!!!");
+ }
+ }
+
+ @Override
+ public void bind(SocketAddress localAddress, ChannelPromise promise) {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void disconnect(ChannelPromise promise) {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void close(ChannelPromise promise) {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void closeForcibly() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void deregister(ChannelPromise promise) {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void beginRead() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void write(Object msg, ChannelPromise promise) {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public void flush() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public ChannelOutboundBuffer outboundBuffer() {
+ throw new NotImplementedException("");
+ }
+ }
+}
diff --git a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelFactory.java b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelFactory.java
index 6f93828..27b84c3 100644
--- a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelFactory.java
+++ b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialChannelFactory.java
@@ -25,9 +25,12 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.jsc.JSerialCommChannel;
import io.netty.channel.jsc.JSerialCommDeviceAddress;
+import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import java.util.concurrent.Executor;
+
public class SerialChannelFactory implements ChannelFactory {
private final String serialPort;
@@ -43,10 +46,8 @@ public class SerialChannelFactory implements ChannelFactory {
try {
Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(new OioEventLoopGroup());
- bootstrap.channel(JSerialCommChannel.class);
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
- bootstrap.option(ChannelOption.TCP_NODELAY, true);
+ bootstrap.group(new NioEventLoopGroup(0, (Executor)null, new SerialSelectorProvider()));
+ bootstrap.channel(SerialChannel.class);
bootstrap.handler(channelHandler);
// Start the client.
ChannelFuture f = bootstrap.connect(address).sync();
diff --git a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
new file mode 100644
index 0000000..95c8993
--- /dev/null
+++ b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialPollingSelector.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.base.connection;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.AbstractSelectableChannel;
+import java.nio.channels.spi.AbstractSelector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-10
+ */
+class SerialPollingSelector extends AbstractSelector {
+
+ private static final Logger logger = LoggerFactory.getLogger(SerialPollingSelector.class);
+
+ public SerialPollingSelector(SelectorProvider selectorProvider) {
+ super(selectorProvider);
+ }
+
+ @Override
+ public Set<SelectionKey> keys() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<SelectionKey> selectedKeys() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public int selectNow() throws IOException {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public int select(long timeout) throws IOException {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public int select() throws IOException {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public Selector wakeup() {
+ // TODO do we have to do something here?
+ return this;
+ }
+
+ @Override
+ protected void implCloseSelector() throws IOException {
+ // TODO should we do something here?
+ }
+
+ @Override
+ protected SelectionKey register(AbstractSelectableChannel ch, int ops, Object att) {
+ logger.debug("Registering Channel for selector {} with operations {}", ch, ops);
+ if (!(ch instanceof SerialSocketChannel)) {
+ throw new IllegalArgumentException("Given channel has to be of type " + SerialSocketChannel.class);
+ }
+ throw new NotImplementedException("");
+ }
+
+}
diff --git a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectorProvider.java b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectorProvider.java
new file mode 100644
index 0000000..f7f37a0
--- /dev/null
+++ b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSelectorProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.base.connection;
+
+import org.apache.commons.lang3.NotImplementedException;
+
+import java.io.IOException;
+import java.net.ProtocolFamily;
+import java.nio.channels.*;
+import java.nio.channels.spi.AbstractSelector;
+import java.nio.channels.spi.SelectorProvider;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-10
+ */
+class SerialSelectorProvider extends SelectorProvider {
+
+ @Override
+ public DatagramChannel openDatagramChannel() throws IOException {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public DatagramChannel openDatagramChannel(ProtocolFamily family) throws IOException {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public Pipe openPipe() throws IOException {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public AbstractSelector openSelector() throws IOException {
+ return new SerialPollingSelector(this);
+ }
+
+ @Override
+ public ServerSocketChannel openServerSocketChannel() throws IOException {
+ throw new NotImplementedException("");
+ }
+
+ @Override
+ public SocketChannel openSocketChannel() throws IOException {
+ return new SerialSocketChannel(this);
+ }
+
+}
diff --git a/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
new file mode 100644
index 0000000..e1b93bf
--- /dev/null
+++ b/plc4j/protocols/driver-bases/serial/src/main/java/org/apache/plc4x/java/base/connection/SerialSocketChannel.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.base.connection;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketOption;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.Set;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-10
+ */
+class SerialSocketChannel extends SocketChannel {
+
+ /**
+ * Initializes a new instance of this class.
+ *
+ * @param provider The provider that created this channel
+ */
+ protected SerialSocketChannel(SelectorProvider provider) {
+ super(provider);
+ }
+
+ @Override
+ public SocketChannel bind(SocketAddress local) throws IOException {
+ return null;
+ }
+
+ @Override
+ public <T> SocketChannel setOption(SocketOption<T> name, T value) throws IOException {
+ return null;
+ }
+
+ @Override
+ public SocketChannel shutdownInput() throws IOException {
+ return null;
+ }
+
+ @Override
+ public SocketChannel shutdownOutput() throws IOException {
+ return null;
+ }
+
+ @Override
+ public Socket socket() {
+ return null;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return false;
+ }
+
+ @Override
+ public boolean isConnectionPending() {
+ return false;
+ }
+
+ @Override
+ public boolean connect(SocketAddress remote) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean finishConnect() throws IOException {
+ return false;
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() throws IOException {
+ return null;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public SocketAddress getLocalAddress() throws IOException {
+ return null;
+ }
+
+ @Override
+ public <T> T getOption(SocketOption<T> name) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Set<SocketOption<?>> supportedOptions() {
+ return null;
+ }
+
+ @Override
+ protected void implCloseSelectableChannel() throws IOException {
+
+ }
+
+ @Override
+ protected void implConfigureBlocking(boolean block) throws IOException {
+
+ }
+}
diff --git a/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java b/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
new file mode 100644
index 0000000..b2aa31b
--- /dev/null
+++ b/plc4j/protocols/driver-bases/serial/src/test/java/org/apache/plc4x/java/base/connection/SerialChannelFactoryTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.base.connection;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * TODO write comment
+ *
+ * @author julian
+ * Created by julian on 2019-08-10
+ */
+public class SerialChannelFactoryTest {
+
+ @Test
+ public void createChannel() throws PlcConnectionException {
+ SerialChannelFactory asdf = new SerialChannelFactory("asdf");
+ asdf.createChannel(new ChannelHandler() {
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+
+ }
+ });
+ }
+}
\ No newline at end of file