You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/18 11:34:45 UTC
[3/4] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-zk-ce
Merge remote-tracking branch 'remotes/origin/master' into ignite-zk-ce
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81014550
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81014550
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81014550
Branch: refs/heads/ignite-zk-ce
Commit: 8101455034a723eadf94dabe4f5d8f1fc0ae5606
Parents: ed492a4 91be7af
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 18 10:38:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 18 10:38:07 2017 +0300
----------------------------------------------------------------------
.../spi/communication/tcp/TcpCommunicationSpi.java | 3 +--
.../TcpCommunicationConnectionCheckFuture.java | 17 ++++++++++++++++-
.../ignite/ml/knn/models/KNNModelFormat.java | 6 +++++-
3 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81014550/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81014550/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 99e1eca,0000000..6cb5622
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@@ -1,461 -1,0 +1,476 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
++import org.apache.ignite.events.DiscoveryEvent;
++import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
++import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.util.GridLeanMap;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
+
++import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
++import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
++
+/**
+ *
+ */
- public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject {
++public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject, GridLocalEventListener {
+ /** Session future. */
+ public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** */
+ public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1);
+
+ /** */
+ private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done");
+
+ /** */
+ private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt");
+
+ /** */
+ private final AtomicInteger resCntr = new AtomicInteger();
+
+ /** */
+ private final List<ClusterNode> nodes;
+
+ /** */
+ private volatile ConnectFuture[] futs;
+
+ /** */
+ private final GridNioServer nioSrvr;
+
+ /** */
+ private final TcpCommunicationSpi spi;
+
+ /** */
+ private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid();
+
+ /** */
+ private final BitSet resBitSet;
+
+ /** */
+ private long endTime;
+
+ /** */
+ private final IgniteLogger log;
+
+ /**
+ * @param spi SPI instance.
+ * @param log Logger.
+ * @param nioSrvr NIO server.
+ * @param nodes Nodes to check.
+ */
+ public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi,
+ IgniteLogger log,
+ GridNioServer nioSrvr,
+ List<ClusterNode> nodes)
+ {
+ this.spi = spi;
+ this.log = log;
+ this.nioSrvr = nioSrvr;
+ this.nodes = nodes;
+
+ resBitSet = new BitSet(nodes.size());
+ }
+
++ /** {@inheritDoc} */
++ @Override public void onEvent(Event evt) {
++ assert evt instanceof DiscoveryEvent : evt;
++ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
++
++ }
++
+ /**
+ * @param timeout Connect timeout.
+ */
+ public void init(long timeout) {
+ ConnectFuture[] futs = new ConnectFuture[nodes.size()];
+
+ UUID locId = spi.getSpiContext().localNode().id();
+
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ if (!node.id().equals(locId)) {
+ if (spi.getSpiContext().node(node.id()) == null) {
+ receivedConnectionStatus(i, false);
+
+ continue;
+ }
+
+ Collection<InetSocketAddress> addrs;
+
+ try {
+ addrs = spi.nodeAddresses(node, false);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to get node addresses: " + node, e);
+
+ receivedConnectionStatus(i, false);
+
+ continue;
+ }
+
+ if (addrs.size() == 1) {
+ SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i);
+
+ fut.init(addrs.iterator().next());
+
+ futs[i] = fut;
+ }
+ else {
+ MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
+
+ fut.init(addrs);
+ }
+ }
+ else
+ receivedConnectionStatus(i, true);
+ }
+
+ this.futs = futs;
+
++ spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
++
+ if (!isDone()) {
+ endTime = System.currentTimeMillis() - timeout;
+
+ spi.getSpiContext().addTimeoutObject(this);
+ }
+ }
+
+ /**
+ * @param idx Node index.
+ * @param res Success flag.
+ */
+ private void receivedConnectionStatus(int idx, boolean res) {
+ assert resCntr.get() < nodes.size();
+
+ synchronized (resBitSet) {
+ resBitSet.set(idx, res);
+ }
+
+ if (resCntr.incrementAndGet() == nodes.size())
+ onDone(resBitSet);
+ }
+
+ /**
+ * @param nodeIdx Node index.
+ * @return Node ID.
+ */
+ private UUID nodeId(int nodeIdx) {
+ return nodes.get(nodeIdx).id();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return timeoutObjId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ if (!isDone())
+ return;
+
+ ConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onTimeout();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ spi.getSpiContext().removeTimeoutObject(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ *
+ */
+ private interface ConnectFuture {
+ /**
+ *
+ */
+ void onTimeout();
+ }
+
+ /**
+ *
+ */
+ private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture {
+ /** */
+ final int nodeIdx;
+
+ /** */
+ volatile int done;
+
+ /** */
+ Map<Integer, Object> sesMeta;
+
+ /** */
+ private SocketChannel ch;
+
+ /**
+ * @param nodeIdx Node index.
+ */
+ SingleAddressConnectFuture(int nodeIdx) {
+ this.nodeIdx = nodeIdx;
+ }
+
+ /**
+ * @param addr Node address.
+ */
+ public void init(InetSocketAddress addr) {
+ boolean connect;
+
+ try {
+ ch = SocketChannel.open();
+
+ ch.configureBlocking(false);
+
+ ch.socket().setTcpNoDelay(true);
+ ch.socket().setKeepAlive(false);
+
+ connect = ch.connect(addr);
+ }
+ catch (Exception e) {
+ finish(false);
+
+ return;
+ }
+
+ if (!connect) {
+ sesMeta = new GridLeanMap<>(3);
+
+ // Set dummy key to identify connection-check outgoing connection.
+ sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, CONN_CHECK_DUMMY_KEY);
+ sesMeta.put(SES_FUT_META, this);
+
+ nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+ @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
+ if (fut.error() != null)
+ finish(false);
+ }
+ });
+ }
+ }
+
+ /**
+ *
+ */
+ void cancel() {
+ if (finish(false))
+ nioSrvr.cancelConnect(ch, sesMeta);
+ }
+
+ /** {@inheritDoc} */
+ public void onTimeout() {
+ cancel();
+ }
+
+ /** {@inheritDoc} */
+ public void onConnected(UUID rmtNodeId) {
+ finish(nodeId(nodeIdx).equals(rmtNodeId));
+ }
+
+ /**
+ * @param res Result.
+ * @return {@code True} if result was set by this call.
+ */
+ boolean finish(boolean res) {
+ if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
+ onStatusReceived(res);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param res Result.
+ */
+ void onStatusReceived(boolean res) {
+ receivedConnectionStatus(nodeIdx, res);
+ }
+ }
+
+ /**
+ *
+ */
+ private class MultipleAddressesConnectFuture implements ConnectFuture {
+ /** */
+ volatile int resCnt;
+
+ /** */
+ volatile SingleAddressConnectFuture[] futs;
+
+ /** */
+ final int nodeIdx;
+
+ /**
+ * @param nodeIdx Node index.
+ */
+ MultipleAddressesConnectFuture(int nodeIdx) {
+ this.nodeIdx = nodeIdx;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onTimeout();
+ }
+ }
+
+ /**
+ * @param addrs Node addresses.
+ */
+ void init(Collection<InetSocketAddress> addrs) {
+ SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()];
+
+ int idx = 0;
+
+ for (InetSocketAddress addr : addrs) {
+ SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) {
+ @Override void onStatusReceived(boolean res) {
+ receivedAddressStatus(res);
+ }
+ };
+
+ fut.init(addr);
+
+ futs[idx++] = fut;
+
+ if (done())
+ return;
+ }
+
+ this.futs = futs;
+
+ // Close race.
+ if (done())
+ cancelFutures();
+ }
+
+ /**
+ * @return {@code True}
+ */
+ private boolean done() {
+ int resCnt0 = resCnt;
+
+ return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length;
+ }
+
+ /**
+ *
+ */
+ private void cancelFutures() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ if (futs != null) {
+ for (int i = 0; i < futs.length; i++) {
+ SingleAddressConnectFuture fut = futs[i];
+
+ fut.cancel();
+ }
+ }
+ }
+
+ /**
+ * @param res Result.
+ */
+ void receivedAddressStatus(boolean res) {
+ if (res) {
+ for (;;) {
+ int resCnt0 = resCnt;
+
+ if (resCnt0 == Integer.MAX_VALUE)
+ return;
+
+ if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) {
+ receivedConnectionStatus(nodeIdx, true);
+
+ cancelFutures(); // Cancel others connects if they are still in progress.
+
+ return;
+ }
+ }
+ }
+ else {
+ for (;;) {
+ int resCnt0 = resCnt;
+
+ if (resCnt0 == Integer.MAX_VALUE)
+ return;
+
+ int resCnt1 = resCnt0 + 1;
+
+ if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) {
+ if (resCnt1 == futs.length)
+ receivedConnectionStatus(nodeIdx, false);
+
+ return;
+ }
+ }
+ }
+ }
+ }
+}