You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/14 16:13:54 UTC
[6/7] ignite git commit: ignite-4587 CacheAtomicWriteOrderMode.CLOCK
mode is removed
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java
index 36f2902..e69de29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshot.java
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.clock;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Snapshot of time deltas for given topology.
- */
-public class GridClockDeltaSnapshot {
- /** Time delta version. */
- private final GridClockDeltaVersion ver;
-
- /** Deltas between coordinator and nodes by node ID. */
- private final Map<UUID, Long> deltas;
-
- /** Pending delta values. */
- @GridToStringExclude
- private final Map<UUID, DeltaAverage> pendingDeltas;
-
- /**
- * @param ver Snapshot version.
- * @param locNodeId Local node ID.
- * @param discoSnap Discovery snapshot.
- * @param avgSize Average size.
- */
- public GridClockDeltaSnapshot(
- GridClockDeltaVersion ver,
- UUID locNodeId,
- GridDiscoveryTopologySnapshot discoSnap,
- int avgSize
- ) {
- assert ver.topologyVersion() == discoSnap.topologyVersion();
-
- this.ver = ver;
-
- deltas = new HashMap<>(discoSnap.topologyNodes().size(), 1.0f);
-
- pendingDeltas = new HashMap<>(discoSnap.topologyNodes().size(), 1.0f);
-
- for (ClusterNode n : discoSnap.topologyNodes()) {
- if (!locNodeId.equals(n.id()))
- pendingDeltas.put(n.id(), new DeltaAverage(avgSize));
- }
- }
-
- /**
- * @param ver Snapshot version.
- * @param deltas Deltas map.
- */
- public GridClockDeltaSnapshot(GridClockDeltaVersion ver, Map<UUID, Long> deltas) {
- this.ver = ver;
- this.deltas = deltas;
-
- pendingDeltas = Collections.emptyMap();
- }
-
- /**
- * @return Version.
- */
- public GridClockDeltaVersion version() {
- return ver;
- }
-
- /**
- * @return Map of collected deltas.
- */
- public Map<UUID, Long> deltas() {
- return deltas;
- }
-
- /**
- * Awaits either until snapshot is ready or timeout elapses.
- *
- * @param timeout Timeout to wait.
- * @throws IgniteInterruptedCheckedException If wait was interrupted.
- */
- public synchronized void awaitReady(long timeout) throws IgniteInterruptedCheckedException {
- long start = System.currentTimeMillis();
-
- try {
- while (!ready()) {
- long now = System.currentTimeMillis();
-
- if (start + timeout - now <= 0)
- return;
-
- wait(start + timeout - now);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
- }
- }
-
- /**
- * Callback invoked when time delta is received from remote node.
- *
- * @param nodeId Node ID.
- * @param timeDelta Calculated time delta.
- * @return {@code True} if more samples needed from that node.
- */
- public synchronized boolean onDeltaReceived(UUID nodeId, long timeDelta) {
- DeltaAverage avg = pendingDeltas.get(nodeId);
-
- if (avg != null) {
- avg.onValue(timeDelta);
-
- if (avg.ready()) {
- pendingDeltas.remove(nodeId);
-
- deltas.put(nodeId, avg.average());
-
- if (ready())
- notifyAll();
-
- return false;
- }
-
- return true;
- }
-
- return false;
- }
-
- /**
- * Callback invoked when node left.
- *
- * @param nodeId Left node ID.
- */
- public synchronized void onNodeLeft(UUID nodeId) {
- pendingDeltas.remove(nodeId);
-
- deltas.put(nodeId, 0L);
-
- if (ready())
- notifyAll();
- }
-
- /**
- * @return {@code True} if snapshot is ready.
- */
- public synchronized boolean ready() {
- return pendingDeltas.isEmpty();
- }
-
- /**
- * @return Collection of node IDs for which response was not received so far.
- */
- public synchronized Collection<UUID> pendingNodeIds() {
- // Must return copy.
- return new HashSet<>(pendingDeltas.keySet());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridClockDeltaSnapshot.class, this);
- }
-
- /**
- * Delta average.
- */
- private static class DeltaAverage {
- /** Delta values. */
- private long[] vals;
-
- /** Current index. */
- private int idx;
-
- /**
- * @param size Accumulator size.
- */
- private DeltaAverage(int size) {
- vals = new long[size];
- }
-
- /**
- * Adds value to accumulator.
- *
- * @param val Value to add.
- */
- public void onValue(long val) {
- if (idx < vals.length)
- vals[idx++] = val;
- }
-
- /**
- * Whether this average is complete.
- *
- * @return {@code True} if enough values is collected.
- */
- public boolean ready() {
- return idx == vals.length;
- }
-
- /**
- * @return Average delta.
- */
- public long average() {
- long sum = 0;
-
- for (long val : vals)
- sum += val;
-
- return sum / vals.length;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
index 4306d7e..e69de29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.clock;
-
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Message containing time delta map for all nodes.
- */
-public class GridClockDeltaSnapshotMessage implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Snapshot version. */
- private GridClockDeltaVersion snapVer;
-
- /** Grid time deltas. */
- @GridToStringInclude
- @GridDirectMap(keyType = UUID.class, valueType = long.class)
- private Map<UUID, Long> deltas;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public GridClockDeltaSnapshotMessage() {
- // No-op.
- }
-
- /**
- * @param snapVer Snapshot version.
- * @param deltas Deltas map.
- */
- public GridClockDeltaSnapshotMessage(GridClockDeltaVersion snapVer, Map<UUID, Long> deltas) {
- this.snapVer = snapVer;
- this.deltas = deltas;
- }
-
- /**
- * @return Snapshot version.
- */
- public GridClockDeltaVersion snapshotVersion() {
- return snapVer;
- }
-
- /**
- * @return Time deltas map.
- */
- public Map<UUID, Long> deltas() {
- return deltas;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeMap("deltas", deltas, MessageCollectionItemType.UUID, MessageCollectionItemType.LONG))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeMessage("snapVer", snapVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- deltas = reader.readMap("deltas", MessageCollectionItemType.UUID, MessageCollectionItemType.LONG, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- snapVer = reader.readMessage("snapVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridClockDeltaSnapshotMessage.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 60;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 2;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridClockDeltaSnapshotMessage.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java
index 19c75e6..e69de29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaVersion.java
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.clock;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Version for time delta snapshot.
- */
-public class GridClockDeltaVersion implements Message, Comparable<GridClockDeltaVersion>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Snapshot local version. */
- private long ver;
-
- /** Topology version. */
- private long topVer;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public GridClockDeltaVersion() {
- // No-op.
- }
-
- /**
- * @param ver Version.
- * @param topVer Topology version.
- */
- public GridClockDeltaVersion(long ver, long topVer) {
- this.ver = ver;
- this.topVer = topVer;
- }
-
- /**
- * @return Snapshot local version.
- */
- public long version() {
- return ver;
- }
-
- /**
- * @return Snapshot topology version.
- */
- public long topologyVersion() {
- return topVer;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public int compareTo(GridClockDeltaVersion o) {
- int res = Long.compare(topVer, o.topVer);
-
- if (res == 0)
- res = Long.compare(ver, o.ver);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (!(o instanceof GridClockDeltaVersion))
- return false;
-
- GridClockDeltaVersion that = (GridClockDeltaVersion)o;
-
- return topVer == that.topVer && ver == that.ver;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = (int)(ver ^ (ver >>> 32));
-
- res = 31 * res + (int)(topVer ^ (topVer >>> 32));
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(ver);
- out.writeLong(topVer);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- ver = in.readLong();
- topVer = in.readLong();
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeLong("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeLong("ver", ver))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- topVer = reader.readLong("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- ver = reader.readLong("ver");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridClockDeltaVersion.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 83;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 2;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridClockDeltaVersion.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java
deleted file mode 100644
index 99dc817..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockMessage.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.clock;
-
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Time server message.
- */
-public class GridClockMessage {
- /** Packet size. */
- public static final int PACKET_SIZE = 48;
-
- /** Originating node ID. */
- private UUID origNodeId;
-
- /** Target node ID. */
- private UUID targetNodeId;
-
- /** Originating timestamp. */
- private long origTs;
-
- /** Remote node reply ts. */
- private long replyTs;
-
- /**
- * @param origNodeId Originating node ID.
- * @param targetNodeId Target node ID.
- * @param origTs Originating timestamp.
- * @param replyTs Reply timestamp.
- */
- public GridClockMessage(UUID origNodeId, UUID targetNodeId, long origTs, long replyTs) {
- this.origNodeId = origNodeId;
- this.targetNodeId = targetNodeId;
- this.origTs = origTs;
- this.replyTs = replyTs;
- }
-
- /**
- * @return Originating node ID.
- */
- public UUID originatingNodeId() {
- return origNodeId;
- }
-
- /**
- * @param origNodeId Originating node ID.
- */
- public void originatingNodeId(UUID origNodeId) {
- this.origNodeId = origNodeId;
- }
-
- /**
- * @return Target node ID.
- */
- public UUID targetNodeId() {
- return targetNodeId;
- }
-
- /**
- * @param targetNodeId Target node ID.
- */
- public void targetNodeId(UUID targetNodeId) {
- this.targetNodeId = targetNodeId;
- }
-
- /**
- * @return Originating timestamp.
- */
- public long originatingTimestamp() {
- return origTs;
- }
-
- /**
- * @param origTs Originating timestamp.
- */
- public void originatingTimestamp(long origTs) {
- this.origTs = origTs;
- }
-
- /**
- * @return Reply timestamp.
- */
- public long replyTimestamp() {
- return replyTs;
- }
-
- /**
- * @param replyTs Reply timestamp.
- */
- public void replyTimestamp(long replyTs) {
- this.replyTs = replyTs;
- }
-
- /**
- * Converts message to bytes to send over network.
- *
- * @return Bytes representing this packet.
- */
- public byte[] toBytes() {
- byte[] buf = new byte[PACKET_SIZE];
-
- int off = 0;
-
- off = U.longToBytes(origNodeId.getLeastSignificantBits(), buf, off);
- off = U.longToBytes(origNodeId.getMostSignificantBits(), buf, off);
-
- off = U.longToBytes(targetNodeId.getLeastSignificantBits(), buf, off);
- off = U.longToBytes(targetNodeId.getMostSignificantBits(), buf, off);
-
- off = U.longToBytes(origTs, buf, off);
-
- off = U.longToBytes(replyTs, buf, off);
-
- assert off == PACKET_SIZE;
-
- return buf;
- }
-
- /**
- * Constructs message from bytes.
- *
- * @param buf Bytes.
- * @param off Offset.
- * @param len Packet length.
- * @return Assembled message.
- * @throws IgniteCheckedException If message length is invalid.
- */
- public static GridClockMessage fromBytes(byte[] buf, int off, int len) throws IgniteCheckedException {
- if (len < PACKET_SIZE)
- throw new IgniteCheckedException("Failed to assemble time server packet (message is too short).");
-
- long lsb = U.bytesToLong(buf, off);
- long msb = U.bytesToLong(buf, off + 8);
-
- UUID origNodeId = new UUID(msb, lsb);
-
- lsb = U.bytesToLong(buf, off + 16);
- msb = U.bytesToLong(buf, off + 24);
-
- UUID targetNodeId = new UUID(msb, lsb);
-
- long origTs = U.bytesToLong(buf, off + 32);
- long replyTs = U.bytesToLong(buf, off + 40);
-
- return new GridClockMessage(origNodeId, targetNodeId, origTs, replyTs);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridClockMessage.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
index 51d396a..e69de29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockServer.java
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.clock;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.SocketException;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
-
-/**
- * Time server that enables time synchronization between nodes.
- */
-public class GridClockServer {
- /** Kernal context. */
- private GridKernalContext ctx;
-
- /** Datagram socket for message exchange. */
- private DatagramSocket sock;
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Read worker. */
- private GridWorker readWorker;
-
- /** Instance of time processor. */
- private GridClockSyncProcessor clockSync;
-
- /**
- * Starts server.
- *
- * @param ctx Kernal context.
- * @throws IgniteCheckedException If server could not be started.
- */
- public void start(GridKernalContext ctx) throws IgniteCheckedException {
- this.ctx = ctx;
-
- clockSync = ctx.clockSync();
- log = ctx.log(GridClockServer.class);
-
- try {
- int startPort = ctx.config().getTimeServerPortBase();
- int portRange = ctx.config().getTimeServerPortRange();
- int endPort = portRange == 0 ? startPort : startPort + portRange - 1;
-
- InetAddress locHost;
-
- if (F.isEmpty(ctx.config().getLocalHost())) {
- try {
- locHost = U.getLocalHost();
- }
- catch (IOException ignored) {
- locHost = InetAddress.getLoopbackAddress();
-
- U.warn(log, "Failed to get local host address, will use loopback address: " + locHost);
- }
- }
- else
- locHost = InetAddress.getByName(ctx.config().getLocalHost());
-
- for (int p = startPort; p <= endPort; p++) {
- try {
- sock = new DatagramSocket(p, locHost);
-
- if (log.isDebugEnabled())
- log.debug("Successfully bound time server [host=" + locHost + ", port=" + p + ']');
-
- break;
- }
- catch (SocketException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to bind time server socket [host=" + locHost + ", port=" + p +
- ", err=" + e.getMessage() + ']');
- }
- }
-
- if (sock == null)
- throw new IgniteCheckedException("Failed to bind time server socket within specified port range " +
- "[locHost=" + locHost + ", startPort=" + startPort + ", endPort=" + endPort + ']');
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to start time server (failed to get local host address)", e);
- }
- }
-
- /**
- * After start callback.
- */
- public void afterStart() {
- readWorker = new ReadWorker();
-
- IgniteThread th = new IgniteThread(readWorker);
-
- th.setPriority(Thread.MAX_PRIORITY);
-
- th.start();
- }
-
- /**
- * Stops server.
- */
- public void stop() {
- // No-op.
- }
-
- /**
- * Before stop callback.
- */
- public void beforeStop() {
- if (readWorker != null)
- readWorker.cancel();
-
- U.closeQuiet(sock);
-
- if (readWorker != null)
- U.join(readWorker, log);
- }
-
- /**
- * Sends packet to remote node.
- *
- * @param msg Message to send.
- * @param addr Address.
- * @param port Port.
- * @throws IgniteCheckedException If send failed.
- */
- public void sendPacket(GridClockMessage msg, InetAddress addr, int port) throws IgniteCheckedException {
- try {
- DatagramPacket packet = new DatagramPacket(msg.toBytes(), GridClockMessage.PACKET_SIZE, addr, port);
-
- if (log.isDebugEnabled())
- log.debug("Sending time sync packet [msg=" + msg + ", addr=" + addr + ", port=" + port);
-
- sock.send(packet);
- }
- catch (IOException e) {
- if (!sock.isClosed())
- throw new IgniteCheckedException("Failed to send datagram message to remote node [addr=" + addr +
- ", port=" + port + ", msg=" + msg + ']', e);
- }
- }
-
- /**
- * @return Address to which this server is bound.
- */
- public InetAddress host() {
- return sock.getLocalAddress();
- }
-
- /**
- * @return Port to which this server is bound.
- */
- public int port() {
- return sock.getLocalPort();
- }
-
- /**
- * Message read worker.
- */
- private class ReadWorker extends GridWorker {
- /**
- * Creates read worker.
- */
- protected ReadWorker() {
- super(ctx.igniteInstanceName(), "grid-time-server-reader", GridClockServer.this.log);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- DatagramPacket packet = new DatagramPacket(new byte[GridClockMessage.PACKET_SIZE],
- GridClockMessage.PACKET_SIZE);
-
- while (!isCancelled()) {
- try {
- // Read packet from buffer.
- sock.receive(packet);
-
- if (log.isDebugEnabled())
- log.debug("Received clock sync message from remote node [host=" + packet.getAddress() +
- ", port=" + packet.getPort() + ']');
-
- GridClockMessage msg = GridClockMessage.fromBytes(packet.getData(), packet.getOffset(),
- packet.getLength());
-
- clockSync.onMessageReceived(msg, packet.getAddress(), packet.getPort());
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to assemble clock server message (will ignore the packet) [host=" +
- packet.getAddress() + ", port=" + packet.getPort() + ", err=" + e.getMessage() + ']');
- }
- catch (IOException e) {
- if (!isCancelled())
- U.warn(log, "Failed to receive message on datagram socket: " + e);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java
deleted file mode 100644
index ef7dc06..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSource.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.clock;
-
-/**
- * Interface representing time source for time processor.
- */
-public interface GridClockSource {
- /**
- * Gets current time in milliseconds past since 1 January, 1970.
- *
- * @return Current time in milliseconds.
- */
- public long currentTimeMillis();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 6c9ffe8..e69de29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -1,481 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.clock;
-
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
-
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.GridTopic.TOPIC_TIME_SYNC;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TIME_SERVER_HOST;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TIME_SERVER_PORT;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-
-/**
- * Time synchronization processor.
- */
-public class GridClockSyncProcessor extends GridProcessorAdapter {
- /** Maximum size for time sync history. */
- private static final int MAX_TIME_SYNC_HISTORY = 100;
-
- /** Time server instance. */
- private GridClockServer srv;
-
- /** Shutdown lock. */
- private GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
-
- /** Stopping flag. */
- private volatile boolean stopping;
-
- /** Time coordinator thread. */
- private volatile TimeCoordinator timeCoord;
-
- /** Time delta history. Constructed on coordinator. */
- private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHist =
- new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY);
-
- /** Last recorded. */
- private volatile T2<GridClockDeltaVersion, GridClockDeltaSnapshot> lastSnapshot;
-
- /** Time source. */
- private GridClockSource clockSrc;
-
- /**
- * @param ctx Kernal context.
- */
- public GridClockSyncProcessor(GridKernalContext ctx) {
- super(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
- super.start(activeOnStart);
-
- clockSrc = ctx.timeSource();
-
- srv = new GridClockServer();
-
- srv.start(ctx);
-
- ctx.io().addMessageListener(TOPIC_TIME_SYNC, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
- assert msg instanceof GridClockDeltaSnapshotMessage;
-
- GridClockDeltaSnapshotMessage msg0 = (GridClockDeltaSnapshotMessage)msg;
-
- GridClockDeltaVersion ver = msg0.snapshotVersion();
-
- GridClockDeltaSnapshot snap = new GridClockDeltaSnapshot(ver, msg0.deltas());
-
- lastSnapshot = new T2<>(ver, snap);
-
- timeSyncHist.put(ver, snap);
- }
- });
-
- // We care only about node leave and fail events.
- ctx.event().addLocalEventListener(new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_JOINED;
-
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
- if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
- checkLaunchCoordinator(discoEvt);
-
- TimeCoordinator timeCoord0 = timeCoord;
-
- if (timeCoord0 != null)
- timeCoord0.onDiscoveryEvent(discoEvt);
- }
- }, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED);
-
- ctx.addNodeAttribute(ATTR_TIME_SERVER_HOST, srv.host());
- ctx.addNodeAttribute(ATTR_TIME_SERVER_PORT, srv.port());
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException {
- super.onKernalStart(activeOnStart);
-
- srv.afterStart();
-
- // Check at startup if this node is a fragmentizer coordinator.
- DiscoveryEvent locJoinEvt = ctx.discovery().localJoinEvent();
-
- checkLaunchCoordinator(locJoinEvt);
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- super.onKernalStop(cancel);
-
- rw.writeLock();
-
- try {
- stopping = false;
-
- if (timeCoord != null) {
- timeCoord.cancel();
-
- U.join(timeCoord, log);
-
- timeCoord = null;
- }
-
- if (srv != null)
- srv.beforeStop();
- }
- finally {
- rw.writeUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- super.stop(cancel);
-
- if (srv != null)
- srv.stop();
- }
-
- /**
- * Gets current time on local node.
- *
- * @return Current time in milliseconds.
- */
- private long currentTime() {
- return clockSrc.currentTimeMillis();
- }
-
- /**
- * @return Time sync history.
- */
- public NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHistory() {
- return timeSyncHist;
- }
-
- /**
- * Callback from server for message receiving.
- *
- * @param msg Received message.
- * @param addr Remote node address.
- * @param port Remote node port.
- */
- public void onMessageReceived(GridClockMessage msg, InetAddress addr, int port) {
- long rcvTs = currentTime();
-
- if (!msg.originatingNodeId().equals(ctx.localNodeId())) {
- // We received time request from remote node, set current time and reply back.
- msg.replyTimestamp(rcvTs);
-
- try {
- srv.sendPacket(msg, addr, port);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send time server reply to remote node: " + msg, e);
- }
- }
- else
- timeCoord.onMessage(msg, rcvTs);
- }
-
- /**
- * Checks if local node is the oldest node in topology and starts time coordinator if so.
- *
- * @param discoEvt Discovery event.
- */
- private void checkLaunchCoordinator(DiscoveryEvent discoEvt) {
- rw.readLock();
-
- try {
- if (stopping)
- return;
-
- if (timeCoord == null) {
- long minNodeOrder = Long.MAX_VALUE;
-
- Collection<ClusterNode> nodes = discoEvt.topologyNodes();
-
- for (ClusterNode node : nodes) {
- if (node.order() < minNodeOrder)
- minNodeOrder = node.order();
- }
-
- ClusterNode locNode = ctx.discovery().localNode();
-
- if (locNode.order() == minNodeOrder) {
- if (log.isDebugEnabled())
- log.debug("Detected local node to be the eldest node in topology, starting time " +
- "coordinator thread [discoEvt=" + discoEvt + ", locNode=" + locNode + ']');
-
- synchronized (this) {
- if (timeCoord == null && !stopping) {
- timeCoord = new TimeCoordinator(discoEvt);
-
- IgniteThread th = new IgniteThread(timeCoord);
-
- th.setPriority(Thread.MAX_PRIORITY);
-
- th.start();
- }
- }
- }
- }
- }
- finally {
- rw.readUnlock();
- }
- }
-
- /**
- * Gets time adjusted with time coordinator on given topology version.
- *
- * @param topVer Topology version.
- * @return Adjusted time.
- */
- public long adjustedTime(long topVer) {
- T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot;
-
- GridClockDeltaSnapshot snap;
-
- if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer)
- snap = fastSnap.get2();
- else {
- // Get last synchronized time on given topology version.
- Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
- new GridClockDeltaVersion(0, topVer + 1));
-
- snap = entry == null ? null : entry.getValue();
- }
-
- long now = clockSrc.currentTimeMillis();
-
- if (snap == null)
- return now;
-
- Long delta = snap.deltas().get(ctx.localNodeId());
-
- if (delta == null)
- delta = 0L;
-
- return now + delta;
- }
-
- /**
- * Publishes snapshot to topology.
- *
- * @param snapshot Snapshot to publish.
- * @param top Topology to send given snapshot to.
- */
- private void publish(GridClockDeltaSnapshot snapshot, GridDiscoveryTopologySnapshot top) {
- if (!rw.tryReadLock())
- return;
-
- try {
- lastSnapshot = new T2<>(snapshot.version(), snapshot);
-
- timeSyncHist.put(snapshot.version(), snapshot);
-
- for (ClusterNode n : top.topologyNodes()) {
- GridClockDeltaSnapshotMessage msg = new GridClockDeltaSnapshotMessage(
- snapshot.version(), snapshot.deltas());
-
- try {
- ctx.io().sendToGridTopic(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
- }
- catch (IgniteCheckedException e) {
- if (ctx.discovery().pingNodeNoError(n.id()))
- U.error(log, "Failed to send time sync snapshot to remote node (did not leave grid?) " +
- "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']');
- else if (log.isDebugEnabled())
- log.debug("Failed to send time sync snapshot to remote node (did not leave grid?) " +
- "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']');
- }
- }
- }
- finally {
- rw.readUnlock();
- }
- }
-
- /**
- * Time coordinator thread.
- */
- private class TimeCoordinator extends GridWorker {
- /** Last discovery topology snapshot. */
- private volatile GridDiscoveryTopologySnapshot lastSnapshot;
-
- /** Snapshot being constructed. May be not null only on coordinator node. */
- private volatile GridClockDeltaSnapshot pendingSnapshot;
-
- /** Version counter. */
- private long verCnt = 1;
-
- /**
- * Time coordinator thread constructor.
- *
- * @param evt Discovery event on which this node became a coordinator.
- */
- protected TimeCoordinator(DiscoveryEvent evt) {
- super(ctx.igniteInstanceName(), "grid-time-coordinator", GridClockSyncProcessor.this.log);
-
- lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes());
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- GridDiscoveryTopologySnapshot top = lastSnapshot;
-
- if (log.isDebugEnabled())
- log.debug("Creating time sync snapshot for topology: " + top);
-
- GridClockDeltaSnapshot snapshot = new GridClockDeltaSnapshot(
- new GridClockDeltaVersion(verCnt++, top.topologyVersion()),
- ctx.localNodeId(),
- top,
- ctx.config().getClockSyncSamples());
-
- pendingSnapshot = snapshot;
-
- while (!snapshot.ready()) {
- if (log.isDebugEnabled())
- log.debug("Requesting time from remote nodes: " + snapshot.pendingNodeIds());
-
- for (UUID nodeId : snapshot.pendingNodeIds())
- requestTime(nodeId);
-
- if (log.isDebugEnabled())
- log.debug("Waiting for snapshot to be ready: " + snapshot);
-
- // Wait for all replies
- snapshot.awaitReady(1000);
- }
-
- // No more messages should be processed.
- pendingSnapshot = null;
-
- if (log.isDebugEnabled())
- log.debug("Collected time sync results: " + snapshot.deltas());
-
- publish(snapshot, top);
-
- synchronized (this) {
- if (top.topologyVersion() == lastSnapshot.topologyVersion())
- wait(ctx.config().getClockSyncFrequency());
- }
- }
- }
-
- /**
- * @param evt Discovery event.
- */
- public void onDiscoveryEvent(DiscoveryEvent evt) {
- if (log.isDebugEnabled())
- log.debug("Processing discovery event: " + evt);
-
- if (evt.type() == EventType.EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT)
- onNodeLeft(evt.eventNode().id());
-
- synchronized (this) {
- lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes());
-
- notifyAll();
- }
- }
-
- /**
- * @param msg Message received from remote node.
- * @param rcvTs Receive timestamp.
- */
- private void onMessage(GridClockMessage msg, long rcvTs) {
- GridClockDeltaSnapshot curr = pendingSnapshot;
-
- if (curr != null) {
- long delta = (msg.originatingTimestamp() + rcvTs) / 2 - msg.replyTimestamp();
-
- boolean needMore = curr.onDeltaReceived(msg.targetNodeId(), delta);
-
- if (needMore)
- requestTime(msg.targetNodeId());
- }
- }
-
- /**
- * Requests time from remote node.
- *
- * @param rmtNodeId Remote node ID.
- */
- private void requestTime(UUID rmtNodeId) {
- ClusterNode node = ctx.discovery().node(rmtNodeId);
-
- if (node != null) {
- InetAddress addr = node.attribute(ATTR_TIME_SERVER_HOST);
- int port = node.attribute(ATTR_TIME_SERVER_PORT);
-
- try {
- GridClockMessage req = new GridClockMessage(ctx.localNodeId(), rmtNodeId, currentTime(), 0);
-
- srv.sendPacket(req, addr, port);
- }
- catch (IgniteCheckedException e) {
- LT.error(log, e, "Failed to send time request to remote node [rmtNodeId=" + rmtNodeId +
- ", addr=" + addr + ", port=" + port + ']');
- }
- }
- else
- onNodeLeft(rmtNodeId);
- }
-
- /**
- * Node left callback.
- *
- * @param nodeId Left node ID.
- */
- private void onNodeLeft(UUID nodeId) {
- GridClockDeltaSnapshot curr = pendingSnapshot;
-
- if (curr != null)
- curr.onNodeLeft(nodeId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java
deleted file mode 100644
index 77bc3eb..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridJvmClockSource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.clock;
-
-/**
- * JVM time source.
- */
-public class GridJvmClockSource implements GridClockSource {
- /** {@inheritDoc} */
- @Override public long currentTimeMillis() {
- return System.currentTimeMillis();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index e4cb0ce..102db96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -85,7 +85,6 @@ import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupp
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -975,7 +974,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
ccfg.setAtomicityMode(cfg.getAtomicityMode());
ccfg.setNodeFilter(cfg.getNodeFilter());
ccfg.setWriteSynchronizationMode(FULL_SYNC);
- ccfg.setAtomicWriteOrderMode(PRIMARY);
ccfg.setRebalanceMode(SYNC);
return ccfg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 3cf88d3..bb11b7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -25,7 +25,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.binary.BinaryRawWriter;
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
@@ -528,7 +527,6 @@ public class IgfsUtils {
*/
private static CacheConfiguration defaultCacheConfig() {
CacheConfiguration cfg = new CacheConfiguration();
- cfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY);
cfg.setAtomicityMode(TRANSACTIONAL);
cfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setCacheMode(CacheMode.PARTITIONED);
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 17a5a16..6a15b85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -36,7 +36,6 @@ import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.binary.BinaryRawWriter;
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
@@ -140,7 +139,6 @@ public class PlatformConfigurationUtils {
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setAtomicityMode(CacheAtomicityMode.fromOrdinal(in.readInt()));
- ccfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.fromOrdinal((byte)in.readInt()));
ccfg.setBackups(in.readInt());
ccfg.setCacheMode(CacheMode.fromOrdinal(in.readInt()));
ccfg.setCopyOnRead(in.readBoolean());
@@ -740,7 +738,6 @@ public class PlatformConfigurationUtils {
assert ccfg != null;
writeEnumInt(writer, ccfg.getAtomicityMode(), CacheConfiguration.DFLT_CACHE_ATOMICITY_MODE);
- writeEnumInt(writer, ccfg.getAtomicWriteOrderMode());
writer.writeInt(ccfg.getBackups());
writeEnumInt(writer, ccfg.getCacheMode(), CacheConfiguration.DFLT_CACHE_MODE);
writer.writeBoolean(ccfg.isCopyOnRead());
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 2023749..7d7d071 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9285,10 +9285,6 @@ public abstract class IgniteUtils {
off += 4;
- GridUnsafe.putLong(arr, off, drVer.globalTime());
-
- off += 8;
-
GridUnsafe.putLong(arr, off, drVer.order());
off += 8;
@@ -9302,10 +9298,6 @@ public abstract class IgniteUtils {
off += 4;
- GridUnsafe.putLong(arr, off, ver.globalTime());
-
- off += 8;
-
GridUnsafe.putLong(arr, off, ver.order());
off += 8;
@@ -9321,16 +9313,14 @@ public abstract class IgniteUtils {
public static GridCacheVersion readVersion(long ptr, boolean verEx) {
GridCacheVersion ver = new GridCacheVersion(GridUnsafe.getInt(ptr),
GridUnsafe.getInt(ptr + 4),
- GridUnsafe.getLong(ptr + 8),
- GridUnsafe.getLong(ptr + 16));
+ GridUnsafe.getLong(ptr + 8));
if (verEx) {
- ptr += 24;
+ ptr += 16;
ver = new GridCacheVersionEx(GridUnsafe.getInt(ptr),
GridUnsafe.getInt(ptr + 4),
GridUnsafe.getLong(ptr + 8),
- GridUnsafe.getLong(ptr + 16),
ver);
}
@@ -9352,15 +9342,11 @@ public abstract class IgniteUtils {
off += 4;
- long globalTime = GridUnsafe.getLong(arr, off);
-
- off += 8;
-
long order = GridUnsafe.getLong(arr, off);
off += 8;
- GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order);
+ GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, order);
if (verEx) {
topVer = GridUnsafe.getInt(arr, off);
@@ -9371,13 +9357,9 @@ public abstract class IgniteUtils {
off += 4;
- globalTime = GridUnsafe.getLong(arr, off);
-
- off += 8;
-
order = GridUnsafe.getLong(arr, off);
- ver = new GridCacheVersionEx(topVer, nodeOrderDrId, globalTime, order, ver);
+ ver = new GridCacheVersionEx(topVer, nodeOrderDrId, order, ver);
}
return ver;
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java
index 91a501c..391b120 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.List;
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -53,9 +52,6 @@ public class VisorCacheConfiguration extends VisorDataTransferObject {
/** Cache atomicity mode. */
private CacheAtomicityMode atomicityMode;
- /** Cache atomicity write ordering mode. */
- private CacheAtomicWriteOrderMode atomicWriteOrderMode;
-
/** Eager ttl flag. */
private boolean eagerTtl;
@@ -148,7 +144,6 @@ public class VisorCacheConfiguration extends VisorDataTransferObject {
name = ccfg.getName();
mode = ccfg.getCacheMode();
atomicityMode = ccfg.getAtomicityMode();
- atomicWriteOrderMode = ccfg.getAtomicWriteOrderMode();
eagerTtl = ccfg.isEagerTtl();
writeSynchronizationMode = ccfg.getWriteSynchronizationMode();
invalidate = ccfg.isInvalidate();
@@ -202,10 +197,10 @@ public class VisorCacheConfiguration extends VisorDataTransferObject {
}
/**
- * @return Cache atomicity write ordering mode.
+ * @return Eager ttl flag
*/
- public CacheAtomicWriteOrderMode getAtomicWriteOrderMode() {
- return atomicWriteOrderMode;
+ public boolean eagerTtl() {
+ return eagerTtl;
}
/**
@@ -395,7 +390,6 @@ public class VisorCacheConfiguration extends VisorDataTransferObject {
U.writeString(out, name);
U.writeEnum(out, mode);
U.writeEnum(out, atomicityMode);
- U.writeEnum(out, atomicWriteOrderMode);
out.writeBoolean(eagerTtl);
U.writeEnum(out, writeSynchronizationMode);
out.writeBoolean(invalidate);
@@ -429,7 +423,6 @@ public class VisorCacheConfiguration extends VisorDataTransferObject {
name = U.readString(in);
mode = CacheMode.fromOrdinal(in.readByte());
atomicityMode = CacheAtomicityMode.fromOrdinal(in.readByte());
- atomicWriteOrderMode = CacheAtomicWriteOrderMode.fromOrdinal(in.readByte());
eagerTtl = in.readBoolean();
writeSynchronizationMode = CacheWriteSynchronizationMode.fromOrdinal(in.readByte());
invalidate = in.readBoolean();
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java b/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java
index a5e83c1..aa84706 100644
--- a/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java
+++ b/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java
@@ -413,8 +413,7 @@ public class BasicWarmupClosure implements IgniteInClosure<IgniteConfiguration>
return
F.eq(ccfg0.getCacheMode(), ccfg1.getCacheMode()) &&
F.eq(ccfg0.getBackups(), ccfg1.getBackups()) &&
- F.eq(ccfg0.getAtomicityMode(), ccfg1.getAtomicityMode()) &&
- F.eq(ccfg0.getAtomicWriteOrderMode(), ccfg1.getAtomicWriteOrderMode());
+ F.eq(ccfg0.getAtomicityMode(), ccfg1.getAtomicityMode());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index ebd28d8..22124af 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -29,7 +29,6 @@ org.apache.ignite.binary.BinaryInvalidTypeException
org.apache.ignite.binary.BinaryObject
org.apache.ignite.binary.BinaryObjectException
org.apache.ignite.cache.CacheAtomicUpdateTimeoutException
-org.apache.ignite.cache.CacheAtomicWriteOrderMode
org.apache.ignite.cache.CacheAtomicityMode
org.apache.ignite.cache.CacheEntryEventSerializableFilter
org.apache.ignite.cache.CacheEntryProcessor
@@ -977,8 +976,8 @@ org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl$UserCacheObjectByteArrayImpl
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl$UserCacheObjectImpl
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl$UserKeyCacheObjectImpl
-org.apache.ignite.internal.processors.clock.GridClockDeltaSnapshotMessage
-org.apache.ignite.internal.processors.clock.GridClockDeltaVersion
+org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1
+org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLA
org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1
org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLA
org.apache.ignite.internal.processors.closure.GridClosureProcessor$C2
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/config/websession/example-cache-base.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/websession/example-cache-base.xml b/modules/core/src/test/config/websession/example-cache-base.xml
index 20f103e..9654fab 100644
--- a/modules/core/src/test/config/websession/example-cache-base.xml
+++ b/modules/core/src/test/config/websession/example-cache-base.xml
@@ -65,7 +65,6 @@
<property name="name" value="partitioned_primary"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="ATOMIC"/>
- <property name="atomicWriteOrderMode" value="PRIMARY"/>
<property name="backups" value="1"/>
</bean>
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/config/websession/spring-cache-1.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/websession/spring-cache-1.xml b/modules/core/src/test/config/websession/spring-cache-1.xml
index 9d049fa..f810282 100644
--- a/modules/core/src/test/config/websession/spring-cache-1.xml
+++ b/modules/core/src/test/config/websession/spring-cache-1.xml
@@ -40,8 +40,6 @@
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
- <property name="atomicWriteOrderMode" value="PRIMARY"/>
-
<property name="backups" value="1"/>
<property name="rebalanceMode" value="SYNC"/>
@@ -56,8 +54,6 @@
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
- <property name="atomicWriteOrderMode" value="PRIMARY"/>
-
<property name="backups" value="1"/>
<property name="rebalanceMode" value="SYNC"/>
@@ -72,8 +68,6 @@
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
- <property name="atomicWriteOrderMode" value="PRIMARY"/>
-
<property name="rebalanceMode" value="SYNC"/>
</bean>
</list>
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/config/websession/spring-cache-2.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/websession/spring-cache-2.xml b/modules/core/src/test/config/websession/spring-cache-2.xml
index 7545ec7..ec20ed9 100644
--- a/modules/core/src/test/config/websession/spring-cache-2.xml
+++ b/modules/core/src/test/config/websession/spring-cache-2.xml
@@ -40,8 +40,6 @@
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
- <property name="atomicWriteOrderMode" value="PRIMARY"/>
-
<property name="backups" value="1"/>
<property name="rebalanceMode" value="SYNC"/>
@@ -56,8 +54,6 @@
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
- <property name="atomicWriteOrderMode" value="PRIMARY"/>
-
<property name="backups" value="1"/>
<property name="rebalanceMode" value="SYNC"/>
@@ -72,8 +68,6 @@
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
- <property name="atomicWriteOrderMode" value="PRIMARY"/>
-
<property name="rebalanceMode" value="SYNC"/>
</bean>
</list>
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/config/websession/spring-cache-3.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/websession/spring-cache-3.xml b/modules/core/src/test/config/websession/spring-cache-3.xml
index a0fb189..6996c40 100644
--- a/modules/core/src/test/config/websession/spring-cache-3.xml
+++ b/modules/core/src/test/config/websession/spring-cache-3.xml
@@ -40,8 +40,6 @@
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
- <property name="atomicWriteOrderMode" value="PRIMARY"/>
-
<property name="backups" value="1"/>
<property name="rebalanceMode" value="SYNC"/>
@@ -56,8 +54,6 @@
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
- <property name="atomicWriteOrderMode" value="PRIMARY"/>
-
<property name="backups" value="1"/>
<property name="rebalanceMode" value="SYNC"/>
@@ -72,8 +68,6 @@
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
- <property name="atomicWriteOrderMode" value="PRIMARY"/>
-
<property name="rebalanceMode" value="SYNC"/>
</bean>
</list>
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index dff827d..5e3b896 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -38,7 +38,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterGroup;
@@ -74,9 +73,6 @@ import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.values;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -176,7 +172,6 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setName("nearCache");
- ccfg.setAtomicWriteOrderMode(PRIMARY);
final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>());
@@ -215,7 +210,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
catch (CacheException e) {
log.info("Expected exception: " + e);
- IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException) e.getCause();
+ IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();
e0.reconnectFuture().get();
}
@@ -229,7 +224,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
}));
disconnectLatch.countDown();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
info("Reconnected: " + evt);
assertEquals(0, disconnectLatch.getCount());
@@ -344,7 +340,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
tx.commit();
fail();
- } catch (IgniteClientDisconnectedException e) {
+ }
+ catch (IgniteClientDisconnectedException e) {
log.info("Expected error: " + e);
assertNotNull(e.reconnectFuture());
@@ -354,7 +351,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
txs.txStart();
fail();
- } catch (IgniteClientDisconnectedException e) {
+ }
+ catch (IgniteClientDisconnectedException e) {
log.info("Expected error: " + e);
assertNotNull(e.reconnectFuture());
@@ -412,8 +410,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
private void reconnectTransactionInProgress1(IgniteEx client,
final TransactionConcurrency txConcurrency,
final IgniteCache<Object, Object> cache)
- throws Exception
- {
+ throws Exception {
Ignite srv = clientRouter(client);
final TestTcpDiscoverySpi clientSpi = spi(client);
@@ -432,7 +429,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
info("Disconnected: " + evt);
disconnectLatch.countDown();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
info("Reconnected: " + evt);
reconnectLatch.countDown();
@@ -787,42 +785,34 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
int cnt = 0;
for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) {
- CacheAtomicWriteOrderMode[] writeOrders =
- atomicityMode == ATOMIC ? values() : new CacheAtomicWriteOrderMode[]{CLOCK};
-
- for (CacheAtomicWriteOrderMode writeOrder : writeOrders) {
- for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
- CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
- ccfg.setAtomicityMode(atomicityMode);
+ for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
- ccfg.setAtomicWriteOrderMode(writeOrder);
+ ccfg.setAtomicityMode(atomicityMode);
- ccfg.setName("cache-" + cnt++);
+ ccfg.setName("cache-" + cnt++);
- ccfg.setWriteSynchronizationMode(syncMode);
+ ccfg.setWriteSynchronizationMode(syncMode);
- if (syncMode != FULL_ASYNC) {
- Class<?> cls = (ccfg.getAtomicityMode() == ATOMIC) ?
- GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class;
+ if (syncMode != FULL_ASYNC) {
+ Class<?> cls = (ccfg.getAtomicityMode() == ATOMIC) ?
+ GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class;
- log.info("Test cache put [atomicity=" + atomicityMode +
- ", writeOrder=" + writeOrder +
- ", syncMode=" + syncMode + ']');
+ log.info("Test cache put [atomicity=" + atomicityMode +
+ ", syncMode=" + syncMode + ']');
- checkOperationInProgressFails(client, ccfg, cls, putOp);
+ checkOperationInProgressFails(client, ccfg, cls, putOp);
- client.destroyCache(ccfg.getName());
- }
+ client.destroyCache(ccfg.getName());
+ }
- log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']');
+ log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']');
- checkOperationInProgressFails(client, ccfg, GridNearSingleGetResponse.class, getOp);
+ checkOperationInProgressFails(client, ccfg, GridNearSingleGetResponse.class, getOp);
- checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getAllOp);
+ checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getAllOp);
- client.destroyCache(ccfg.getName());
- }
+ client.destroyCache(ccfg.getName());
}
}
}
@@ -1121,7 +1111,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
@Override public Void call() throws Exception {
int idx = nodeIdx.incrementAndGet();
- for (int i = 0; i < 25; i++) {
+ for (int i = 0; i < 25; i++) {
startGrid(idx);
stopGrid(idx);
@@ -1260,27 +1250,32 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
/**
*
*/
- static class TestClass1 implements Serializable {}
+ static class TestClass1 implements Serializable {
+ }
/**
*
*/
- static class TestClass2 implements Serializable {}
+ static class TestClass2 implements Serializable {
+ }
/**
*
*/
- static class TestClass3 implements Serializable {}
+ static class TestClass3 implements Serializable {
+ }
/**
*
*/
- static class TestClass4 implements Serializable {}
+ static class TestClass4 implements Serializable {
+ }
/**
*
*/
- static class TestClass5 implements Serializable {}
+ static class TestClass5 implements Serializable {
+ }
/**
* @param client Client.
@@ -1293,8 +1288,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
final CacheConfiguration<Object, Object> ccfg,
Class<?> msgToBlock,
final IgniteInClosure<IgniteCache<Object, Object>> c)
- throws Exception
- {
+ throws Exception {
Ignite srv = clientRouter(client);
TestTcpDiscoverySpi srvSpi = spi(srv);
@@ -1381,8 +1375,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
String cacheName,
boolean cacheExists,
boolean clientCache,
- boolean clientNear)
- {
+ boolean clientNear) {
GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery();
GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery();
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
index a4c59ea..991735b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
@@ -42,7 +42,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -72,7 +71,6 @@ public class CacheAtomicSingleMessageCountSelfTest extends GridCommonAbstractTes
cCfg.setCacheMode(PARTITIONED);
cCfg.setBackups(1);
cCfg.setWriteSynchronizationMode(FULL_SYNC);
- cCfg.setAtomicWriteOrderMode(PRIMARY);
cfg.setCacheConfiguration(cCfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java
index 2017365..efaaee8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java
@@ -34,7 +34,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -240,7 +239,6 @@ public abstract class CacheEnumOperationsAbstractTest extends GridCommonAbstract
ccfg.setAtomicityMode(atomicityMode);
ccfg.setCacheMode(cacheMode);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
- ccfg.setAtomicWriteOrderMode(PRIMARY);
if (cacheMode == PARTITIONED)
ccfg.setBackups(backups);
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
index 5b7769e..7d49c11 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
@@ -60,7 +60,6 @@ import org.jetbrains.annotations.NotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -729,7 +728,6 @@ public class CacheInterceptorPartitionCounterRandomOperationsTest extends GridCo
ccfg.setAtomicityMode(atomicityMode);
ccfg.setCacheMode(cacheMode);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
- ccfg.setAtomicWriteOrderMode(PRIMARY);
if (cacheMode == PARTITIONED)
ccfg.setBackups(backups);
http://git-wip-us.apache.org/repos/asf/ignite/blob/247282f3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
index cdd9072..70f2fc5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
@@ -26,7 +26,6 @@ import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.store.CacheStore;
@@ -105,7 +104,6 @@ public abstract class CacheStoreUsageMultinodeAbstractTest extends GridCommonAbs
ccfg.setCacheMode(PARTITIONED);
ccfg.setAtomicityMode(atomicityMode());
- ccfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY);
ccfg.setBackups(1);
ccfg.setWriteSynchronizationMode(FULL_SYNC);