You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/18 11:34:45 UTC

[3/4] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-zk-ce

Merge remote-tracking branch 'remotes/origin/master' into ignite-zk-ce

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81014550
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81014550
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81014550

Branch: refs/heads/ignite-zk-ce
Commit: 8101455034a723eadf94dabe4f5d8f1fc0ae5606
Parents: ed492a4 91be7af
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 18 10:38:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 18 10:38:07 2017 +0300

----------------------------------------------------------------------
 .../spi/communication/tcp/TcpCommunicationSpi.java |  3 +--
 .../TcpCommunicationConnectionCheckFuture.java     | 17 ++++++++++++++++-
 .../ignite/ml/knn/models/KNNModelFormat.java       |  6 +++++-
 3 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/81014550/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81014550/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 99e1eca,0000000..6cb5622
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@@ -1,461 -1,0 +1,476 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.spi.communication.tcp.internal;
 +
 +import java.net.InetSocketAddress;
 +import java.nio.channels.SocketChannel;
 +import java.util.BitSet;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 +import org.apache.ignite.IgniteLogger;
 +import org.apache.ignite.cluster.ClusterNode;
++import org.apache.ignite.events.DiscoveryEvent;
++import org.apache.ignite.events.Event;
 +import org.apache.ignite.internal.IgniteInternalFuture;
++import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 +import org.apache.ignite.internal.util.GridLeanMap;
 +import org.apache.ignite.internal.util.future.GridFutureAdapter;
 +import org.apache.ignite.internal.util.nio.GridNioServer;
 +import org.apache.ignite.internal.util.nio.GridNioSession;
 +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
 +import org.apache.ignite.internal.util.typedef.internal.U;
 +import org.apache.ignite.lang.IgniteInClosure;
 +import org.apache.ignite.lang.IgniteUuid;
 +import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 +import org.jetbrains.annotations.Nullable;
 +
++import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
++import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
++
 +/**
 + *
 + */
- public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject {
++public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject, GridLocalEventListener {
 +    /** Session future. */
 +    public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
 +
 +    /** */
 +    public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1);
 +
 +    /** */
 +    private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater =
 +        AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done");
 +
 +    /** */
 +    private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater =
 +        AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt");
 +
 +    /** */
 +    private final AtomicInteger resCntr = new AtomicInteger();
 +
 +    /** */
 +    private final List<ClusterNode> nodes;
 +
 +    /** */
 +    private volatile ConnectFuture[] futs;
 +
 +    /** */
 +    private final GridNioServer nioSrvr;
 +
 +    /** */
 +    private final TcpCommunicationSpi spi;
 +
 +    /** */
 +    private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid();
 +
 +    /** */
 +    private final BitSet resBitSet;
 +
 +    /** */
 +    private long endTime;
 +
 +    /** */
 +    private final IgniteLogger log;
 +
 +    /**
 +     * @param spi SPI instance.
 +     * @param log Logger.
 +     * @param nioSrvr NIO server.
 +     * @param nodes Nodes to check.
 +     */
 +    public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi,
 +        IgniteLogger log,
 +        GridNioServer nioSrvr,
 +        List<ClusterNode> nodes)
 +    {
 +        this.spi = spi;
 +        this.log = log;
 +        this.nioSrvr = nioSrvr;
 +        this.nodes = nodes;
 +
 +        resBitSet = new BitSet(nodes.size());
 +    }
 +
++    /** {@inheritDoc} */
++    @Override public void onEvent(Event evt) {
++        assert evt instanceof DiscoveryEvent : evt;
++        assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
++
++    }
++
 +    /**
 +     * @param timeout Connect timeout.
 +     */
 +    public void init(long timeout) {
 +        ConnectFuture[] futs = new ConnectFuture[nodes.size()];
 +
 +        UUID locId = spi.getSpiContext().localNode().id();
 +
 +        for (int i = 0; i < nodes.size(); i++) {
 +            ClusterNode node = nodes.get(i);
 +
 +            if (!node.id().equals(locId)) {
 +                if (spi.getSpiContext().node(node.id()) == null) {
 +                    receivedConnectionStatus(i, false);
 +
 +                    continue;
 +                }
 +
 +                Collection<InetSocketAddress> addrs;
 +
 +                try {
 +                    addrs = spi.nodeAddresses(node, false);
 +                }
 +                catch (Exception e) {
 +                    U.error(log, "Failed to get node addresses: " + node, e);
 +
 +                    receivedConnectionStatus(i, false);
 +
 +                    continue;
 +                }
 +
 +                if (addrs.size() == 1) {
 +                    SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i);
 +
 +                    fut.init(addrs.iterator().next());
 +
 +                    futs[i] = fut;
 +                }
 +                else {
 +                    MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
 +
 +                    fut.init(addrs);
 +                }
 +            }
 +            else
 +                receivedConnectionStatus(i, true);
 +        }
 +
 +        this.futs = futs;
 +
++        spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
++
 +        if (!isDone()) {
 +            endTime = System.currentTimeMillis() - timeout;
 +
 +            spi.getSpiContext().addTimeoutObject(this);
 +        }
 +    }
 +
 +    /**
 +     * @param idx Node index.
 +     * @param res Success flag.
 +     */
 +    private void receivedConnectionStatus(int idx, boolean res) {
 +        assert resCntr.get() < nodes.size();
 +
 +        synchronized (resBitSet) {
 +            resBitSet.set(idx, res);
 +        }
 +
 +        if (resCntr.incrementAndGet() == nodes.size())
 +            onDone(resBitSet);
 +    }
 +
 +    /**
 +     * @param nodeIdx Node index.
 +     * @return Node ID.
 +     */
 +    private UUID nodeId(int nodeIdx) {
 +        return nodes.get(nodeIdx).id();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public IgniteUuid id() {
 +        return timeoutObjId;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long endTime() {
 +        return endTime;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void onTimeout() {
 +        if (!isDone())
 +            return;
 +
 +        ConnectFuture[] futs = this.futs;
 +
 +        for (int i = 0; i < futs.length; i++) {
 +            ConnectFuture fut = futs[i];
 +
 +            if (fut != null)
 +                fut.onTimeout();
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) {
 +        if (super.onDone(res, err)) {
 +            spi.getSpiContext().removeTimeoutObject(this);
 +
 +            return true;
 +        }
 +
 +        return false;
 +    }
 +
 +    /**
 +     *
 +     */
 +    private interface ConnectFuture {
 +        /**
 +         *
 +         */
 +        void onTimeout();
 +    }
 +
 +    /**
 +     *
 +     */
 +    private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture {
 +        /** */
 +        final int nodeIdx;
 +
 +        /** */
 +        volatile int done;
 +
 +        /** */
 +        Map<Integer, Object> sesMeta;
 +
 +        /** */
 +        private SocketChannel ch;
 +
 +        /**
 +         * @param nodeIdx Node index.
 +         */
 +        SingleAddressConnectFuture(int nodeIdx) {
 +            this.nodeIdx = nodeIdx;
 +        }
 +
 +        /**
 +         * @param addr Node address.
 +         */
 +        public void init(InetSocketAddress addr) {
 +            boolean connect;
 +
 +            try {
 +                ch = SocketChannel.open();
 +
 +                ch.configureBlocking(false);
 +
 +                ch.socket().setTcpNoDelay(true);
 +                ch.socket().setKeepAlive(false);
 +
 +                connect = ch.connect(addr);
 +            }
 +            catch (Exception e) {
 +                finish(false);
 +
 +                return;
 +            }
 +
 +            if (!connect) {
 +                sesMeta = new GridLeanMap<>(3);
 +
 +                // Set dummy key to identify connection-check outgoing connection.
 +                sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, CONN_CHECK_DUMMY_KEY);
 +                sesMeta.put(SES_FUT_META, this);
 +
 +                nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
 +                    @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
 +                        if (fut.error() != null)
 +                            finish(false);
 +                    }
 +                });
 +            }
 +        }
 +
 +        /**
 +         *
 +         */
 +        void cancel() {
 +            if (finish(false))
 +                nioSrvr.cancelConnect(ch, sesMeta);
 +        }
 +
 +        /** {@inheritDoc} */
 +        public void onTimeout() {
 +            cancel();
 +        }
 +
 +        /** {@inheritDoc} */
 +        public void onConnected(UUID rmtNodeId) {
 +            finish(nodeId(nodeIdx).equals(rmtNodeId));
 +        }
 +
 +        /**
 +         * @param res Result.
 +         * @return {@code True} if result was set by this call.
 +         */
 +        boolean finish(boolean res) {
 +            if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
 +                onStatusReceived(res);
 +
 +                return true;
 +            }
 +
 +            return false;
 +        }
 +
 +        /**
 +         * @param res Result.
 +         */
 +        void onStatusReceived(boolean res) {
 +            receivedConnectionStatus(nodeIdx, res);
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private class MultipleAddressesConnectFuture implements ConnectFuture {
 +        /** */
 +        volatile int resCnt;
 +
 +        /** */
 +        volatile SingleAddressConnectFuture[] futs;
 +
 +        /** */
 +        final int nodeIdx;
 +
 +        /**
 +         * @param nodeIdx Node index.
 +         */
 +        MultipleAddressesConnectFuture(int nodeIdx) {
 +            this.nodeIdx = nodeIdx;
 +
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void onTimeout() {
 +            SingleAddressConnectFuture[] futs = this.futs;
 +
 +            for (int i = 0; i < futs.length; i++) {
 +                ConnectFuture fut = futs[i];
 +
 +                if (fut != null)
 +                    fut.onTimeout();
 +            }
 +        }
 +
 +        /**
 +         * @param addrs Node addresses.
 +         */
 +        void init(Collection<InetSocketAddress> addrs) {
 +            SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()];
 +
 +            int idx = 0;
 +
 +            for (InetSocketAddress addr : addrs) {
 +                SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) {
 +                    @Override void onStatusReceived(boolean res) {
 +                        receivedAddressStatus(res);
 +                    }
 +                };
 +
 +                fut.init(addr);
 +
 +                futs[idx++] = fut;
 +
 +                if (done())
 +                    return;
 +            }
 +
 +            this.futs = futs;
 +
 +            // Close race.
 +            if (done())
 +                cancelFutures();
 +        }
 +
 +        /**
 +         * @return {@code True}
 +         */
 +        private boolean done() {
 +            int resCnt0 = resCnt;
 +
 +            return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length;
 +        }
 +
 +        /**
 +         *
 +         */
 +        private void cancelFutures() {
 +            SingleAddressConnectFuture[] futs = this.futs;
 +
 +            if (futs != null) {
 +                for (int i = 0; i < futs.length; i++) {
 +                    SingleAddressConnectFuture fut = futs[i];
 +
 +                    fut.cancel();
 +                }
 +            }
 +        }
 +
 +        /**
 +         * @param res Result.
 +         */
 +        void receivedAddressStatus(boolean res) {
 +            if (res) {
 +                for (;;) {
 +                    int resCnt0 = resCnt;
 +
 +                    if (resCnt0 == Integer.MAX_VALUE)
 +                        return;
 +
 +                    if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) {
 +                        receivedConnectionStatus(nodeIdx, true);
 +
 +                        cancelFutures(); // Cancel others connects if they are still in progress.
 +
 +                        return;
 +                    }
 +                }
 +            }
 +            else {
 +                for (;;) {
 +                    int resCnt0 = resCnt;
 +
 +                    if (resCnt0 == Integer.MAX_VALUE)
 +                        return;
 +
 +                    int resCnt1 = resCnt0 + 1;
 +
 +                    if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) {
 +                        if (resCnt1 == futs.length)
 +                            receivedConnectionStatus(nodeIdx, false);
 +
 +                        return;
 +                    }
 +                }
 +            }
 +        }
 +    }
 +}