You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/03 19:39:56 UTC
[1/3] incubator-ignite git commit: # ignite-394: IgniteDataLoader ->
IgniteDataStreamer.java + impl
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-394 [created] 9b33b6510
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
new file mode 100644
index 0000000..3f94752
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
@@ -0,0 +1,1453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.dr.*;
+import org.apache.ignite.internal.processors.portable.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.Map.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ * Data loader implementation.
+ */
+@SuppressWarnings("unchecked")
+public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
+ /** Isolated updater. */
+ private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+
+ /** Cache updater. */
+ private Updater<K, V> updater = ISOLATED_UPDATER;
+
+ /** */
+ private byte[] updaterBytes;
+
+ /** Max remap count before issuing an error. */
+ private static final int DFLT_MAX_REMAP_CNT = 32;
+
+ /** Log reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Cache name ({@code null} for default cache). */
+ private final String cacheName;
+
+ /** Portable enabled flag. */
+ private final boolean portableEnabled;
+
+ /**
+ * If {@code true} then data will be transferred in compact format (only keys and values).
+ * Otherwise full map entry will be transferred (this is requires by DR internal logic).
+ */
+ private final boolean compact;
+
+ /** Per-node buffer size. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
+
+ /** */
+ private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+
+ /** */
+ private long autoFlushFreq;
+
+ /** Mapping. */
+ @GridToStringInclude
+ private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Discovery listener. */
+ private final GridLocalEventListener discoLsnr;
+
+ /** Context. */
+ private final GridKernalContext ctx;
+
+ /** Communication topic for responses. */
+ private final Object topic;
+
+ /** */
+ private byte[] topicBytes;
+
+ /** {@code True} if data loader has been cancelled. */
+ private volatile boolean cancelled;
+
+ /** Active futures of this data loader. */
+ @GridToStringInclude
+ private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
+
+ /** Closure to remove from active futures. */
+ @GridToStringExclude
+ private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> t) {
+ boolean rmv = activeFuts.remove(t);
+
+ assert rmv;
+ }
+ };
+
+ /** Job peer deploy aware. */
+ private volatile GridPeerDeployAware jobPda;
+
+ /** Deployment class. */
+ private Class<?> depCls;
+
+ /** Future to track loading finish. */
+ private final GridFutureAdapter<?> fut;
+
+ /** Public API future to track loading finish. */
+ private final IgniteFuture<?> publicFut;
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ /** */
+ private volatile long lastFlushTime = U.currentTimeMillis();
+
+ /** */
+ private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ;
+
+ /** */
+ private boolean skipStore;
+
+ /** */
+ private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
+
+ /**
+ * @param ctx Grid kernal context.
+ * @param cacheName Cache name.
+ * @param flushQ Flush queue.
+ * @param compact If {@code true} data is transferred in compact mode (only keys and values).
+ * Otherwise full map entry will be transferred (this is required by DR internal logic).
+ */
+ public IgniteDataStreamerImpl(
+ final GridKernalContext ctx,
+ @Nullable final String cacheName,
+ DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ,
+ boolean compact
+ ) {
+ assert ctx != null;
+
+ this.ctx = ctx;
+ this.cacheName = cacheName;
+ this.flushQ = flushQ;
+ this.compact = compact;
+
+ log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
+
+ ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+ if (node == null)
+ throw new IllegalStateException("Cache doesn't exist: " + cacheName);
+
+ portableEnabled = ctx.portable().portableEnabled(node, cacheName);
+
+ discoLsnr = new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ UUID id = discoEvt.eventNode().id();
+
+ // Remap regular mappings.
+ final Buffer buf = bufMappings.remove(id);
+
+ if (buf != null) {
+ // Only async notification is possible since
+ // discovery thread may be trapped otherwise.
+ ctx.closure().callLocalSafe(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ buf.onNodeLeft();
+
+ return null;
+ }
+ },
+ true /* system pool */
+ );
+ }
+ }
+ };
+
+ ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+ // Generate unique topic for this loader.
+ topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
+
+ ctx.io().addMessageListener(topic, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof GridDataLoadResponse;
+
+ GridDataLoadResponse res = (GridDataLoadResponse)msg;
+
+ if (log.isDebugEnabled())
+ log.debug("Received data load response: " + res);
+
+ Buffer buf = bufMappings.get(nodeId);
+
+ if (buf != null)
+ buf.onResponse(res);
+
+ else if (log.isDebugEnabled())
+ log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
+ }
+ });
+
+ if (log.isDebugEnabled())
+ log.debug("Added response listener within topic: " + topic);
+
+ fut = new GridDataLoaderFuture(ctx, this);
+
+ publicFut = new IgniteFutureImpl<>(fut);
+ }
+
+ /**
+ * Enters busy lock.
+ */
+ private void enterBusy() {
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Data loader has been closed.");
+ }
+
+ /**
+ * Leaves busy lock.
+ */
+ private void leaveBusy() {
+ busyLock.leaveBusy();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> future() {
+ return publicFut;
+ }
+
+ /**
+ * @return Internal future.
+ */
+ public IgniteInternalFuture<?> internalFuture() {
+ return fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deployClass(Class<?> depCls) {
+ this.depCls = depCls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updater(Updater<K, V> updater) {
+ A.notNull(updater, "updater");
+
+ this.updater = updater;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allowOverwrite() {
+ return updater != ISOLATED_UPDATER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void allowOverwrite(boolean allow) {
+ if (allow == allowOverwrite())
+ return;
+
+ ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+ if (node == null)
+ throw new IgniteException("Failed to get node for cache: " + cacheName);
+
+ updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipStore() {
+ return skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void skipStore(boolean skipStore) {
+ this.skipStore = skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public String cacheName() {
+ return cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int perNodeBufferSize() {
+ return bufSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void perNodeBufferSize(int bufSize) {
+ A.ensure(bufSize > 0, "bufSize > 0");
+
+ this.bufSize = bufSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int perNodeParallelLoadOperations() {
+ return parallelOps;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void perNodeParallelLoadOperations(int parallelOps) {
+ this.parallelOps = parallelOps;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long autoFlushFrequency() {
+ return autoFlushFreq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void autoFlushFrequency(long autoFlushFreq) {
+ A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
+
+ long old = this.autoFlushFreq;
+
+ if (autoFlushFreq != old) {
+ this.autoFlushFreq = autoFlushFreq;
+
+ if (autoFlushFreq != 0 && old == 0)
+ flushQ.add(this);
+ else if (autoFlushFreq == 0)
+ flushQ.remove(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
+ A.notNull(entries, "entries");
+
+ return addData(entries.entrySet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
+ A.notEmpty(entries, "entries");
+
+ enterBusy();
+
+ try {
+ GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
+
+ resFut.listenAsync(rmvActiveFut);
+
+ activeFuts.add(resFut);
+
+ Collection<K> keys = null;
+
+ if (entries.size() > 1) {
+ keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
+
+ for (Map.Entry<K, V> entry : entries)
+ keys.add(entry.getKey());
+ }
+
+ load0(entries, resFut, keys, 0);
+
+ return new IgniteFutureImpl<>(resFut);
+ }
+ catch (IgniteException e) {
+ return new IgniteFinishedFutureImpl<>(ctx, e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
+ A.notNull(entry, "entry");
+
+ return addData(F.asList(entry));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(K key, V val) {
+ A.notNull(key, "key");
+
+ return addData(new Entry0<>(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> removeData(K key) {
+ return addData(key, null);
+ }
+
+ /**
+ * @param entries Entries.
+ * @param resFut Result future.
+ * @param activeKeys Active keys.
+ * @param remaps Remaps count.
+ */
+ private void load0(
+ Collection<? extends Map.Entry<K, V>> entries,
+ final GridFutureAdapter<Object> resFut,
+ @Nullable final Collection<K> activeKeys,
+ final int remaps
+ ) {
+ assert entries != null;
+
+ Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
+
+ boolean initPda = ctx.deploy().enabled() && jobPda == null;
+
+ for (Map.Entry<K, V> entry : entries) {
+ List<ClusterNode> nodes;
+
+ try {
+ K key = entry.getKey();
+
+ assert key != null;
+
+ if (initPda) {
+ jobPda = new DataLoaderPda(key, entry.getValue(), updater);
+
+ initPda = false;
+ }
+
+ nodes = nodes(key);
+ }
+ catch (IgniteCheckedException e) {
+ resFut.onDone(e);
+
+ return;
+ }
+
+ if (F.isEmpty(nodes)) {
+ resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
+ "(no nodes with cache found in topology) [infos=" + entries.size() +
+ ", cacheName=" + cacheName + ']'));
+
+ return;
+ }
+
+ for (ClusterNode node : nodes) {
+ Collection<Map.Entry<K, V>> col = mappings.get(node);
+
+ if (col == null)
+ mappings.put(node, col = new ArrayList<>());
+
+ col.add(entry);
+ }
+ }
+
+ for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
+ final UUID nodeId = e.getKey().id();
+
+ Buffer buf = bufMappings.get(nodeId);
+
+ if (buf == null) {
+ Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+
+ if (old != null)
+ buf = old;
+ }
+
+ final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
+
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> t) {
+ try {
+ t.get();
+
+ if (activeKeys != null) {
+ for (Map.Entry<K, V> e : entriesForNode)
+ activeKeys.remove(e.getKey());
+
+ if (activeKeys.isEmpty())
+ resFut.onDone();
+ }
+ else {
+ assert entriesForNode.size() == 1;
+
+ // That has been a single key,
+ // so complete result future right away.
+ resFut.onDone();
+ }
+ }
+ catch (IgniteCheckedException e1) {
+ if (log.isDebugEnabled())
+ log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+ if (cancelled) {
+ resFut.onDone(new IgniteCheckedException("Data loader has been cancelled: " +
+ IgniteDataStreamerImpl.this, e1));
+ }
+ else if (remaps + 1 > maxRemapCnt) {
+ resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
+ + remaps), e1);
+ }
+ else
+ load0(entriesForNode, resFut, activeKeys, remaps + 1);
+ }
+ }
+ };
+
+ GridFutureAdapter<?> f;
+
+ try {
+ f = buf.update(entriesForNode, lsnr);
+ }
+ catch (IgniteInterruptedCheckedException e1) {
+ resFut.onDone(e1);
+
+ return;
+ }
+
+ if (ctx.discovery().node(nodeId) == null) {
+ if (bufMappings.remove(nodeId, buf))
+ buf.onNodeLeft();
+
+ if (f != null)
+ f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+ "(node has left): " + nodeId));
+ }
+ }
+ }
+
+ /**
+ * @param key Key to map.
+ * @return Nodes to send requests to.
+ * @throws IgniteCheckedException If failed.
+ */
+ private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
+ GridAffinityProcessor aff = ctx.affinity();
+
+ return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
+ Collections.singletonList(aff.mapKeyToNode(cacheName, key));
+ }
+
+ /**
+ * Performs flush.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ private void doFlush() throws IgniteCheckedException {
+ lastFlushTime = U.currentTimeMillis();
+
+ List<IgniteInternalFuture> activeFuts0 = null;
+
+ int doneCnt = 0;
+
+ for (IgniteInternalFuture<?> f : activeFuts) {
+ if (!f.isDone()) {
+ if (activeFuts0 == null)
+ activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
+
+ activeFuts0.add(f);
+ }
+ else {
+ f.get();
+
+ doneCnt++;
+ }
+ }
+
+ if (activeFuts0 == null || activeFuts0.isEmpty())
+ return;
+
+ while (true) {
+ Queue<IgniteInternalFuture<?>> q = null;
+
+ for (Buffer buf : bufMappings.values()) {
+ IgniteInternalFuture<?> flushFut = buf.flush();
+
+ if (flushFut != null) {
+ if (q == null)
+ q = new ArrayDeque<>(bufMappings.size() * 2);
+
+ q.add(flushFut);
+ }
+ }
+
+ if (q != null) {
+ assert !q.isEmpty();
+
+ boolean err = false;
+
+ for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to flush buffer: " + e);
+
+ err = true;
+ }
+ }
+
+ if (err)
+ // Remaps needed - flush buffers.
+ continue;
+ }
+
+ doneCnt = 0;
+
+ for (int i = 0; i < activeFuts0.size(); i++) {
+ IgniteInternalFuture f = activeFuts0.get(i);
+
+ if (f == null)
+ doneCnt++;
+ else if (f.isDone()) {
+ f.get();
+
+ doneCnt++;
+
+ activeFuts0.set(i, null);
+ }
+ else
+ break;
+ }
+
+ if (doneCnt == activeFuts0.size())
+ return;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public void flush() throws IgniteException {
+ enterBusy();
+
+ try {
+ doFlush();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * Flushes every internal buffer if buffer was flushed before passed in
+ * threshold.
+ * <p>
+ * Does not wait for result and does not fail on errors assuming that this method
+ * should be called periodically.
+ */
+ @Override public void tryFlush() throws IgniteInterruptedException {
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ for (Buffer buf : bufMappings.values())
+ buf.flush();
+
+ lastFlushTime = U.currentTimeMillis();
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * @param cancel {@code True} to close with cancellation.
+ * @throws IgniteException If failed.
+ */
+ @Override public void close(boolean cancel) throws IgniteException {
+ try {
+ closeEx(cancel);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /**
+ * @param cancel {@code True} to close with cancellation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void closeEx(boolean cancel) throws IgniteCheckedException {
+ if (!closed.compareAndSet(false, true))
+ return;
+
+ busyLock.block();
+
+ if (log.isDebugEnabled())
+ log.debug("Closing data loader [ldr=" + this + ", cancel=" + cancel + ']');
+
+ IgniteCheckedException e = null;
+
+ try {
+ // Assuming that no methods are called on this loader after this method is called.
+ if (cancel) {
+ cancelled = true;
+
+ for (Buffer buf : bufMappings.values())
+ buf.cancelAll();
+ }
+ else
+ doFlush();
+
+ ctx.event().removeLocalEventListener(discoLsnr);
+
+ ctx.io().removeMessageListener(topic);
+ }
+ catch (IgniteCheckedException e0) {
+ e = e0;
+ }
+
+ fut.onDone(null, e);
+
+ if (e != null)
+ throw e;
+ }
+
+ /**
+ * @return {@code true} If the loader is closed.
+ */
+ boolean isClosed() {
+ return fut.isDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteException {
+ close(false);
+ }
+
+ /**
+ * @return Max remap count.
+ */
+ public int maxRemapCount() {
+ return maxRemapCnt;
+ }
+
+ /**
+ * @param maxRemapCnt New max remap count.
+ */
+ public void maxRemapCount(int maxRemapCnt) {
+ this.maxRemapCnt = maxRemapCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteDataStreamerImpl.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDelay(TimeUnit unit) {
+ return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @return Next flush time.
+ */
+ private long nextFlushTime() {
+ return lastFlushTime + autoFlushFreq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(Delayed o) {
+ return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime() ? 1 : -1;
+ }
+
+ /**
+ *
+ */
+ private class Buffer {
+ /** Node. */
+ private final ClusterNode node;
+
+ /** Active futures. */
+ private final Collection<IgniteInternalFuture<Object>> locFuts;
+
+ /** Buffered entries. */
+ private List<Map.Entry<K, V>> entries;
+
+ /** */
+ @GridToStringExclude
+ private GridFutureAdapter<Object> curFut;
+
+ /** Local node flag. */
+ private final boolean isLocNode;
+
+ /** ID generator. */
+ private final AtomicLong idGen = new AtomicLong();
+
+ /** Active futures. */
+ private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
+
+ /** */
+ private final Semaphore sem;
+
+ /** Closure to signal on task finish. */
+ @GridToStringExclude
+ private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> t) {
+ signalTaskFinished(t);
+ }
+ };
+
+ /**
+ * @param node Node.
+ */
+ Buffer(ClusterNode node) {
+ assert node != null;
+
+ this.node = node;
+
+ locFuts = new GridConcurrentHashSet<>();
+ reqs = new ConcurrentHashMap8<>();
+
+ // Cache local node flag.
+ isLocNode = node.equals(ctx.discovery().localNode());
+
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>(ctx);
+ curFut.listenAsync(signalC);
+
+ sem = new Semaphore(parallelOps);
+ }
+
+ /**
+ * @param newEntries Infos.
+ * @param lsnr Listener for the operation future.
+ * @throws IgniteInterruptedCheckedException If failed.
+ * @return Future for operation.
+ */
+ @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
+ List<Map.Entry<K, V>> entries0 = null;
+ GridFutureAdapter<Object> curFut0;
+
+ synchronized (this) {
+ curFut0 = curFut;
+
+ curFut0.listenAsync(lsnr);
+
+ for (Map.Entry<K, V> entry : newEntries)
+ entries.add(entry);
+
+ if (entries.size() >= bufSize) {
+ entries0 = entries;
+
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>(ctx);
+ curFut.listenAsync(signalC);
+ }
+ }
+
+ if (entries0 != null) {
+ submit(entries0, curFut0);
+
+ if (cancelled)
+ curFut0.onDone(new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataStreamerImpl.this));
+ }
+
+ return curFut0;
+ }
+
+ /**
+ * @return Fresh collection with some space for outgrowth.
+ */
+ private List<Map.Entry<K, V>> newEntries() {
+ return new ArrayList<>((int)(bufSize * 1.2));
+ }
+
+ /**
+ * @return Future if any submitted.
+ *
+ * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+ */
+ @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
+ List<Map.Entry<K, V>> entries0 = null;
+ GridFutureAdapter<Object> curFut0 = null;
+
+ synchronized (this) {
+ if (!entries.isEmpty()) {
+ entries0 = entries;
+ curFut0 = curFut;
+
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>(ctx);
+ curFut.listenAsync(signalC);
+ }
+ }
+
+ if (entries0 != null)
+ submit(entries0, curFut0);
+
+ // Create compound future for this flush.
+ GridCompoundFuture<Object, Object> res = null;
+
+ for (IgniteInternalFuture<Object> f : locFuts) {
+ if (res == null)
+ res = new GridCompoundFuture<>(ctx);
+
+ res.add(f);
+ }
+
+ for (IgniteInternalFuture<Object> f : reqs.values()) {
+ if (res == null)
+ res = new GridCompoundFuture<>(ctx);
+
+ res.add(f);
+ }
+
+ if (res != null)
+ res.markInitialized();
+
+ return res;
+ }
+
+ /**
+ * Increments active tasks count.
+ *
+ * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+ */
+ private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
+ U.acquire(sem);
+ }
+
+ /**
+ * @param f Future that finished.
+ */
+ private void signalTaskFinished(IgniteInternalFuture<Object> f) {
+ assert f != null;
+
+ sem.release();
+ }
+
+ /**
+ * @param entries Entries to submit.
+ * @param curFut Current future.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
+ throws IgniteInterruptedCheckedException {
+ assert entries != null;
+ assert !entries.isEmpty();
+ assert curFut != null;
+
+ incrementActiveTasks();
+
+ IgniteInternalFuture<Object> fut;
+
+ if (isLocNode) {
+ fut = ctx.closure().callLocalSafe(
+ new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
+
+ locFuts.add(fut);
+
+ fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> t) {
+ try {
+ boolean rmv = locFuts.remove(t);
+
+ assert rmv;
+
+ curFut.onDone(t.get());
+ }
+ catch (IgniteCheckedException e) {
+ curFut.onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ byte[] entriesBytes;
+
+ try {
+ if (compact) {
+ entriesBytes = ctx.config().getMarshaller()
+ .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
+ }
+ else
+ entriesBytes = ctx.config().getMarshaller().marshal(entries);
+
+ if (updaterBytes == null) {
+ assert updater != null;
+
+ updaterBytes = ctx.config().getMarshaller().marshal(updater);
+ }
+
+ if (topicBytes == null)
+ topicBytes = ctx.config().getMarshaller().marshal(topic);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal (request will not be sent).", e);
+
+ return;
+ }
+
+ GridDeployment dep = null;
+ GridPeerDeployAware jobPda0 = null;
+
+ if (ctx.deploy().enabled()) {
+ try {
+ jobPda0 = jobPda;
+
+ assert jobPda0 != null;
+
+ dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
+
+ GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+
+ if (cache != null)
+ cache.context().deploy().onEnter();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
+
+ return;
+ }
+
+ if (dep == null)
+ U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
+ }
+
+ long reqId = idGen.incrementAndGet();
+
+ fut = curFut;
+
+ reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+
+ GridDataLoadRequest req = new GridDataLoadRequest(
+ reqId,
+ topicBytes,
+ cacheName,
+ updaterBytes,
+ entriesBytes,
+ true,
+ skipStore,
+ dep != null ? dep.deployMode() : null,
+ dep != null ? jobPda0.deployClass().getName() : null,
+ dep != null ? dep.userVersion() : null,
+ dep != null ? dep.participants() : null,
+ dep != null ? dep.classLoaderId() : null,
+ dep == null);
+
+ try {
+ ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
+
+ if (log.isDebugEnabled())
+ log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
+ }
+ catch (IgniteCheckedException e) {
+ if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+ ((GridFutureAdapter<Object>)fut).onDone(e);
+ else
+ ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
+ "request (node has left): " + node.id()));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ void onNodeLeft() {
+ assert !isLocNode;
+ assert bufMappings.get(node.id()) != this;
+
+ if (log.isDebugEnabled())
+ log.debug("Forcibly completing futures (node has left): " + node.id());
+
+ Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
+ "(node has left): " + node.id());
+
+ for (GridFutureAdapter<Object> f : reqs.values())
+ f.onDone(e);
+
+ // Make sure to complete current future.
+ GridFutureAdapter<Object> curFut0;
+
+ synchronized (this) {
+ curFut0 = curFut;
+ }
+
+ curFut0.onDone(e);
+ }
+
+ /**
+ * @param res Response.
+ */
+ void onResponse(GridDataLoadResponse res) {
+ if (log.isDebugEnabled())
+ log.debug("Received data load response: " + res);
+
+ GridFutureAdapter<?> f = reqs.remove(res.requestId());
+
+ if (f == null) {
+ if (log.isDebugEnabled())
+ log.debug("Future for request has not been found: " + res.requestId());
+
+ return;
+ }
+
+ Throwable err = null;
+
+ byte[] errBytes = res.errorBytes();
+
+ if (errBytes != null) {
+ try {
+ GridPeerDeployAware jobPda0 = jobPda;
+
+ err = ctx.config().getMarshaller().unmarshal(
+ errBytes,
+ jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
+
+ return;
+ }
+ }
+
+ f.onDone(null, err);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
+ }
+
+ /**
+ *
+ */
+ void cancelAll() {
+ IgniteCheckedException err = new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataStreamerImpl.this);
+
+ for (IgniteInternalFuture<?> f : locFuts) {
+ try {
+ f.cancel();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to cancel mini-future.", e);
+ }
+ }
+
+ for (GridFutureAdapter<?> f : reqs.values())
+ f.onDone(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ int size;
+
+ synchronized (this) {
+ size = entries.size();
+ }
+
+ return S.toString(Buffer.class, this,
+ "entriesCnt", size,
+ "locFutsSize", locFuts.size(),
+ "reqsSize", reqs.size());
+ }
+ }
+
+ /**
+ * Data loader peer-deploy aware.
+ */
+ private class DataLoaderPda implements GridPeerDeployAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Deploy class. */
+ private Class<?> cls;
+
+ /** Class loader. */
+ private ClassLoader ldr;
+
+ /** Collection of objects to detect deploy class and class loader. */
+ private Collection<Object> objs;
+
+ /**
+ * Constructs data loader peer-deploy aware.
+ *
+ * @param objs Collection of objects to detect deploy class and class loader.
+ */
+ private DataLoaderPda(Object... objs) {
+ this.objs = Arrays.asList(objs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class<?> deployClass() {
+ if (cls == null) {
+ Class<?> cls0 = null;
+
+ if (depCls != null)
+ cls0 = depCls;
+ else {
+ for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
+ Object o = it.next();
+
+ if (o != null)
+ cls0 = U.detectClass(o);
+ }
+
+ if (cls0 == null || U.isJdk(cls0))
+ cls0 = IgniteDataStreamerImpl.class;
+ }
+
+ assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
+
+ cls = cls0;
+ }
+
+ return cls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClassLoader classLoader() {
+ if (ldr == null) {
+ ClassLoader ldr0 = deployClass().getClassLoader();
+
+ // Safety.
+ if (ldr0 == null)
+ ldr0 = U.gridClassLoader();
+
+ assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
+
+ ldr = ldr0;
+ }
+
+ return ldr;
+ }
+ }
+
+ /**
+ * Entry.
+ */
+ private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private K key;
+
+ /** */
+ private V val;
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ */
+ private Entry0(K key, @Nullable V val) {
+ assert key != null;
+
+ this.key = key;
+ this.val = val;
+ }
+
+ /**
+ * For {@link Externalizable}.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public Entry0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public K getKey() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getValue() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V setValue(V val) {
+ V old = this.val;
+
+ this.val = val;
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(key);
+ out.writeObject(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ key = (K)in.readObject();
+ val = (V)in.readObject();
+ }
+ }
+
+ /**
+ * Wrapper list with special compact serialization of map entries.
+ */
+ private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Wrapped delegate. */
+ private Collection<Map.Entry<K, V>> delegate;
+
+ /** Optional portable processor for converting values. */
+ private GridPortableProcessor portable;
+
+ /**
+ * @param delegate Delegate.
+ * @param portable Portable processor.
+ */
+ private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
+ this.delegate = delegate;
+ this.portable = portable;
+ }
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public Entries0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Entry<K, V>> iterator() {
+ return delegate.iterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return delegate.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(delegate.size());
+
+ boolean portableEnabled = portable != null;
+
+ for (Map.Entry<K, V> entry : delegate) {
+ if (portableEnabled) {
+ out.writeObject(portable.marshalToPortable(entry.getKey()));
+ out.writeObject(portable.marshalToPortable(entry.getValue()));
+ }
+ else {
+ out.writeObject(entry.getKey());
+ out.writeObject(entry.getValue());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ int sz = in.readInt();
+
+ delegate = new ArrayList<>(sz);
+
+ for (int i = 0; i < sz; i++) {
+ Object k = in.readObject();
+ Object v = in.readObject();
+
+ delegate.add(new Entry0<>((K)k, (V)v));
+ }
+ }
+ }
+
+ /**
+ * Isolated updater which only loads entry initial value.
+ */
+ private static class IsolatedUpdater<K, V> implements Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
+
+ GridCacheAdapter<K, V> internalCache = proxy.context().cache();
+
+ if (internalCache.isNear())
+ internalCache = internalCache.context().near().dht();
+
+ GridCacheContext<K, V> cctx = internalCache.context();
+
+ long topVer = cctx.affinity().affinityTopologyVersion();
+
+ GridCacheVersion ver = cctx.versions().next(topVer);
+
+ boolean portable = cctx.portableEnabled();
+
+ for (Map.Entry<K, V> e : entries) {
+ try {
+ K key = e.getKey();
+ V val = e.getValue();
+
+ if (portable) {
+ key = (K)cctx.marshalToPortable(key);
+ val = (V)cctx.marshalToPortable(val);
+ }
+
+ GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer);
+
+ entry.unswap(true, false);
+
+ entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer,
+ GridDrType.DR_LOAD);
+
+ cctx.evicts().touch(entry, topVer);
+ }
+ catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
+ // No-op.
+ }
+ catch (IgniteCheckedException ex) {
+ IgniteLogger log = cache.unwrap(Ignite.class).log();
+
+ U.error(log, "Failed to set initial value for cache entry: " + e, ex);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
index b8cfe77..e34eb16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
@@ -31,7 +31,7 @@ import java.util.*;
/**
* Data center replication cache updater for data loader.
*/
-public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoader.Updater<K, V> {
+public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataStreamer.Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index e960422..49b78f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -302,8 +302,8 @@ public class IgfsDataManager extends IgfsManager {
*
* @return New instance of data loader.
*/
- private IgniteDataLoader<IgfsBlockKey, byte[]> dataLoader() {
- IgniteDataLoader<IgfsBlockKey, byte[]> ldr =
+ private IgniteDataStreamer<IgfsBlockKey, byte[]> dataLoader() {
+ IgniteDataStreamer<IgfsBlockKey, byte[]> ldr =
igfsCtx.kernalContext().<IgfsBlockKey, byte[]>dataLoad().dataLoader(dataCachePrj.name());
IgfsConfiguration cfg = igfsCtx.configuration();
@@ -641,7 +641,7 @@ public class IgfsDataManager extends IgfsManager {
", cleanNonColocated=" + cleanNonColocated + ", startIdx=" + startIdx + ", endIdx=" + endIdx + ']');
try {
- try (IgniteDataLoader<IgfsBlockKey, byte[]> ldr = dataLoader()) {
+ try (IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = dataLoader()) {
for (long idx = startIdx; idx <= endIdx; idx++) {
ldr.removeData(new IgfsBlockKey(fileInfo.id(), range.affinityKey(), fileInfo.evictExclude(),
idx));
@@ -667,7 +667,7 @@ public class IgfsDataManager extends IgfsManager {
long endIdx = range.endOffset() / fileInfo.blockSize();
try {
- try (IgniteDataLoader<IgfsBlockKey, byte[]> ldr = dataLoader()) {
+ try (IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = dataLoader()) {
long bytesProcessed = 0;
for (long idx = startIdx; idx <= endIdx; idx++) {
@@ -1705,7 +1705,7 @@ public class IgfsDataManager extends IgfsManager {
break;
}
- IgniteDataLoader<IgfsBlockKey, byte[]> ldr = dataLoader();
+ IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = dataLoader();
try {
IgfsFileMap map = fileInfo.fileMap();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
index 4f5da0b..b60b76b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
@@ -99,7 +99,7 @@ public abstract class IgniteTxConsistencyRestartAbstractSelfTest extends GridCom
public void testTxConsistency() throws Exception {
startGridsMultiThreaded(GRID_CNT);
- IgniteDataLoader<Object, Object> ldr = grid(0).dataLoader(null);
+ IgniteDataStreamer<Object, Object> ldr = grid(0).dataLoader(null);
for (int i = 0; i < RANGE; i++) {
ldr.addData(i, 0);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
index 8950f45..414fc42 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
@@ -140,7 +140,7 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac
* @param g Grid.
*/
private static void realTimePopulate(final Ignite g) {
- try (IgniteDataLoader<Integer, Long> ldr = g.dataLoader(null)) {
+ try (IgniteDataStreamer<Integer, Long> ldr = g.dataLoader(null)) {
// Sets max values to 1 so cache metrics have correct values.
ldr.perNodeParallelLoadOperations(1);
@@ -155,7 +155,7 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac
/**
* Increments value for key.
*/
- private static class IncrementingUpdater implements IgniteDataLoader.Updater<Integer, Long> {
+ private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> {
/** */
private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() {
@Override public Void process(MutableEntry<Integer, Long> e, Object... args) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java
index ea8f060..ecb2411 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java
@@ -108,7 +108,7 @@ public class GridCacheLruNearEvictionPolicySelfTest extends GridCommonAbstractTe
info("Inserting " + cnt + " keys to cache.");
- try (IgniteDataLoader<Integer, String> ldr = grid(0).dataLoader(null)) {
+ try (IgniteDataStreamer<Integer, String> ldr = grid(0).dataLoader(null)) {
for (int i = 0; i < cnt; i++)
ldr.addData(i, Integer.toString(i));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java
index a381082..5ac6dae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java
@@ -141,7 +141,7 @@ public class GridCacheNearOnlyLruNearEvictionPolicySelfTest extends GridCommonAb
info("Inserting " + cnt + " keys to cache.");
- try (IgniteDataLoader<Integer, String> ldr = grid(1).dataLoader(null)) {
+ try (IgniteDataStreamer<Integer, String> ldr = grid(1).dataLoader(null)) {
for (int i = 0; i < cnt; i++)
ldr.addData(i, Integer.toString(i));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
index e433856..501e56c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.dataload;
import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -88,7 +87,7 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest {
Ignite g4 = grid(4);
- IgniteDataLoader<Object, Object> dataLdr = g4.dataLoader(null);
+ IgniteDataStreamer<Object, Object> dataLdr = g4.dataLoader(null);
dataLdr.perNodeBufferSize(32);
@@ -135,7 +134,7 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest {
else
fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName());
- IgniteDataLoader<Integer, String> dataLdr = g0.dataLoader(null);
+ IgniteDataStreamer<Integer, String> dataLdr = g0.dataLoader(null);
Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
index 1945912..fc1a1cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
@@ -137,7 +137,7 @@ public class GridDataLoaderPerformanceTest extends GridCommonAbstractTest {
Ignite ignite = startGrid();
- final IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(null);
+ final IgniteDataStreamer<Integer, String> ldr = ignite.dataLoader(null);
ldr.perNodeBufferSize(8192);
ldr.updater(GridDataLoadCacheUpdaters.<Integer, String>batchedSorted());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
index a666423..5959957 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
@@ -19,16 +19,15 @@ package org.apache.ignite.internal.processors.dataload;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.GridCache;
import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.cache.store.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.spi.discovery.tcp.*;
@@ -37,7 +36,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.junits.common.*;
import org.jetbrains.annotations.*;
-import javax.cache.Cache;
+import javax.cache.*;
import javax.cache.configuration.*;
import java.util.*;
import java.util.concurrent.*;
@@ -177,7 +176,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
Ignite g2 = startGrid(2);
startGrid(3);
- final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null);
+ final IgniteDataStreamer<Integer, Integer> ldr = g1.dataLoader(null);
ldr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
@@ -219,7 +218,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
assertEquals(total, s2 + s3);
- final IgniteDataLoader<Integer, Integer> rmvLdr = g2.dataLoader(null);
+ final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataLoader(null);
rmvLdr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
@@ -299,7 +298,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
final int cnt = 40_000;
final int threads = 10;
- try (final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null)) {
+ try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataLoader(null)) {
final AtomicInteger idxGen = new AtomicInteger();
IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
@@ -359,7 +358,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
- IgniteDataLoader<Object, Object> dataLdr = g1.dataLoader(null);
+ IgniteDataStreamer<Object, Object> dataLdr = g1.dataLoader(null);
for (int i = 0, size = arrays.size(); i < 1000; i++) {
Object arr = arrays.get(i % size);
@@ -417,7 +416,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
Ignite g1 = grid(1);
// Get and configure loader.
- final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null);
+ final IgniteDataStreamer<Integer, Integer> ldr = g1.dataLoader(null);
ldr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>individual());
ldr.perNodeBufferSize(2);
@@ -519,7 +518,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
try {
Ignite g1 = startGrid(1);
- IgniteDataLoader<Object, Object> ldr = g1.dataLoader(null);
+ IgniteDataStreamer<Object, Object> ldr = g1.dataLoader(null);
ldr.close(false);
@@ -677,7 +676,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
final IgniteCache<Integer, Integer> c = g.jcache(null);
- final IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
+ final IgniteDataStreamer<Integer, Integer> ldr = g.dataLoader(null);
ldr.perNodeBufferSize(10);
@@ -729,7 +728,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
IgniteCache<Integer, Integer> c = g.jcache(null);
- IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
+ IgniteDataStreamer<Integer, Integer> ldr = g.dataLoader(null);
ldr.perNodeBufferSize(10);
@@ -776,7 +775,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
assertTrue(c.localSize() == 0);
- IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
+ IgniteDataStreamer<Integer, Integer> ldr = g.dataLoader(null);
ldr.perNodeBufferSize(10);
ldr.autoFlushFrequency(3000);
@@ -821,7 +820,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
for (int i = 0; i < 1000; i++)
storeMap.put(i, i);
- try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) {
+ try (IgniteDataStreamer<Object, Object> ldr = ignite.dataLoader(null)) {
ldr.allowOverwrite(true);
assertFalse(ldr.skipStore());
@@ -839,7 +838,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
for (int i = 1000; i < 2000; i++)
assertEquals(i, storeMap.get(i));
- try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) {
+ try (IgniteDataStreamer<Object, Object> ldr = ignite.dataLoader(null)) {
ldr.allowOverwrite(true);
ldr.skipStore(true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
index e8ffedf..769f201 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
@@ -180,7 +180,7 @@ public class GridTestMain {
ExecutorCompletionService<Object> execSvc =
new ExecutorCompletionService<>(Executors.newFixedThreadPool(numThreads));
- try (IgniteDataLoader<GridTestKey, Long> ldr = G.ignite().dataLoader("partitioned")) {
+ try (IgniteDataStreamer<GridTestKey, Long> ldr = G.ignite().dataLoader("partitioned")) {
for (int i = 0; i < numThreads; i++) {
final int threadId = i;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java
index 8615e10..4ec0d82 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java
@@ -40,7 +40,7 @@ public class GridGcTimeoutTest {
public static void main(String[] args) {
Ignite g = G.start(U.resolveIgniteUrl(CFG_PATH));
- IgniteDataLoader<Long, String> ldr = g.dataLoader(null);
+ IgniteDataStreamer<Long, String> ldr = g.dataLoader(null);
ldr.perNodeBufferSize(16 * 1024);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
index 4e6d9b5..50c7eba 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
@@ -33,7 +33,7 @@ public class GridContinuousMapperLoadTest1 {
try (Ignite g = G.start("examples/config/example-cache.xml")) {
int max = 30000;
- IgniteDataLoader<Integer, TestObject> ldr = g.dataLoader("replicated");
+ IgniteDataStreamer<Integer, TestObject> ldr = g.dataLoader("replicated");
for (int i = 0; i < max; i++)
ldr.addData(i, new TestObject(i, "Test object: " + i));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
index c9ba9ab..c6f364a 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
@@ -57,7 +57,7 @@ public class GridContinuousMapperLoadTest2 {
try {
int max = 20000;
- IgniteDataLoader<Integer, TestObject> ldr = g.dataLoader("replicated");
+ IgniteDataStreamer<Integer, TestObject> ldr = g.dataLoader("replicated");
for (int i = 0; i < max; i++)
ldr.addData(i, new TestObject(i, "Test object: " + i));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 58478d3..d4dcf22 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -22,7 +22,7 @@ import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.*;
import org.jetbrains.annotations.*;
@@ -171,7 +171,7 @@ public class IgniteMock implements Ignite {
}
/** {@inheritDoc} */
- @Override public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+ @Override public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala
----------------------------------------------------------------------
diff --git a/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala b/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala
index 6f5553b..53d6f26 100644
--- a/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala
+++ b/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala
@@ -17,9 +17,6 @@
package org.apache.ignite.scalar
-import java.net.URL
-import java.util.UUID
-
import org.apache.ignite._
import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField}
import org.apache.ignite.cluster.ClusterNode
@@ -27,6 +24,9 @@ import org.apache.ignite.configuration.IgniteConfiguration
import org.apache.ignite.internal.IgniteVersionUtils._
import org.jetbrains.annotations.Nullable
+import java.net.URL
+import java.util.UUID
+
import scala.annotation.meta.field
/**
@@ -294,7 +294,7 @@ object scalar extends ScalarConversions {
*/
@inline def dataLoader$[K, V](
@Nullable cacheName: String,
- bufSize: Int): IgniteDataLoader[K, V] = {
+ bufSize: Int): IgniteDataStreamer[K, V] = {
val dl = ignite$.dataLoader[K, V](cacheName)
dl.perNodeBufferSize(bufSize)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 31aa9e5..f9f6e9e 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -247,7 +247,7 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea
}
/** {@inheritDoc} */
- @Override public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+ @Override public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
assert g != null;
return g.dataLoader(cacheName);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java
index e26ab9f..b70d618 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java
@@ -40,7 +40,7 @@ public class IgniteSqlQueryBenchmark extends IgniteCacheAbstractBenchmark {
long start = System.nanoTime();
- try (IgniteDataLoader<Integer, Person> dataLdr = ignite().dataLoader(cache.getName())) {
+ try (IgniteDataStreamer<Integer, Person> dataLdr = ignite().dataLoader(cache.getName())) {
for (int i = 0; i < args.range() && !Thread.currentThread().isInterrupted(); i++) {
dataLdr.addData(i, new Person(i, "firstName" + i, "lastName" + i, i * 1000));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java
index 4e34d14..f6ba47a 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java
@@ -39,7 +39,7 @@ public class IgniteSqlQueryJoinBenchmark extends IgniteCacheAbstractBenchmark {
long start = System.nanoTime();
- try (IgniteDataLoader<Object, Object> dataLdr = ignite().dataLoader(cache.getName())) {
+ try (IgniteDataStreamer<Object, Object> dataLdr = ignite().dataLoader(cache.getName())) {
final int orgRange = args.range() / 10;
// Populate organizations.
[2/3] incubator-ignite git commit: # ignite-394: IgniteDataLoader ->
IgniteDataStreamer.java + impl
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
deleted file mode 100644
index ed3bbcb..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
+++ /dev/null
@@ -1,1453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dr.*;
-import org.apache.ignite.internal.processors.portable.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.Map.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- * Data loader implementation.
- */
-@SuppressWarnings("unchecked")
-public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delayed {
- /** Isolated updater. */
- private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
-
- /** Cache updater. */
- private Updater<K, V> updater = ISOLATED_UPDATER;
-
- /** */
- private byte[] updaterBytes;
-
- /** Max remap count before issuing an error. */
- private static final int DFLT_MAX_REMAP_CNT = 32;
-
- /** Log reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Cache name ({@code null} for default cache). */
- private final String cacheName;
-
- /** Portable enabled flag. */
- private final boolean portableEnabled;
-
- /**
- * If {@code true} then data will be transferred in compact format (only keys and values).
- * Otherwise full map entry will be transferred (this is requires by DR internal logic).
- */
- private final boolean compact;
-
- /** Per-node buffer size. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
-
- /** */
- private int parallelOps = DFLT_MAX_PARALLEL_OPS;
-
- /** */
- private long autoFlushFreq;
-
- /** Mapping. */
- @GridToStringInclude
- private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Discovery listener. */
- private final GridLocalEventListener discoLsnr;
-
- /** Context. */
- private final GridKernalContext ctx;
-
- /** Communication topic for responses. */
- private final Object topic;
-
- /** */
- private byte[] topicBytes;
-
- /** {@code True} if data loader has been cancelled. */
- private volatile boolean cancelled;
-
- /** Active futures of this data loader. */
- @GridToStringInclude
- private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
-
- /** Closure to remove from active futures. */
- @GridToStringExclude
- private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> t) {
- boolean rmv = activeFuts.remove(t);
-
- assert rmv;
- }
- };
-
- /** Job peer deploy aware. */
- private volatile GridPeerDeployAware jobPda;
-
- /** Deployment class. */
- private Class<?> depCls;
-
- /** Future to track loading finish. */
- private final GridFutureAdapter<?> fut;
-
- /** Public API future to track loading finish. */
- private final IgniteFuture<?> publicFut;
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
- /** Closed flag. */
- private final AtomicBoolean closed = new AtomicBoolean();
-
- /** */
- private volatile long lastFlushTime = U.currentTimeMillis();
-
- /** */
- private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ;
-
- /** */
- private boolean skipStore;
-
- /** */
- private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
-
- /**
- * @param ctx Grid kernal context.
- * @param cacheName Cache name.
- * @param flushQ Flush queue.
- * @param compact If {@code true} data is transferred in compact mode (only keys and values).
- * Otherwise full map entry will be transferred (this is required by DR internal logic).
- */
- public IgniteDataLoaderImpl(
- final GridKernalContext ctx,
- @Nullable final String cacheName,
- DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ,
- boolean compact
- ) {
- assert ctx != null;
-
- this.ctx = ctx;
- this.cacheName = cacheName;
- this.flushQ = flushQ;
- this.compact = compact;
-
- log = U.logger(ctx, logRef, IgniteDataLoaderImpl.class);
-
- ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
-
- if (node == null)
- throw new IllegalStateException("Cache doesn't exist: " + cacheName);
-
- portableEnabled = ctx.portable().portableEnabled(node, cacheName);
-
- discoLsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
-
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
- UUID id = discoEvt.eventNode().id();
-
- // Remap regular mappings.
- final Buffer buf = bufMappings.remove(id);
-
- if (buf != null) {
- // Only async notification is possible since
- // discovery thread may be trapped otherwise.
- ctx.closure().callLocalSafe(
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- buf.onNodeLeft();
-
- return null;
- }
- },
- true /* system pool */
- );
- }
- }
- };
-
- ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
-
- // Generate unique topic for this loader.
- topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
-
- ctx.io().addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
- assert msg instanceof GridDataLoadResponse;
-
- GridDataLoadResponse res = (GridDataLoadResponse)msg;
-
- if (log.isDebugEnabled())
- log.debug("Received data load response: " + res);
-
- Buffer buf = bufMappings.get(nodeId);
-
- if (buf != null)
- buf.onResponse(res);
-
- else if (log.isDebugEnabled())
- log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
- }
- });
-
- if (log.isDebugEnabled())
- log.debug("Added response listener within topic: " + topic);
-
- fut = new GridDataLoaderFuture(ctx, this);
-
- publicFut = new IgniteFutureImpl<>(fut);
- }
-
- /**
- * Enters busy lock.
- */
- private void enterBusy() {
- if (!busyLock.enterBusy())
- throw new IllegalStateException("Data loader has been closed.");
- }
-
- /**
- * Leaves busy lock.
- */
- private void leaveBusy() {
- busyLock.leaveBusy();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> future() {
- return publicFut;
- }
-
- /**
- * @return Internal future.
- */
- public IgniteInternalFuture<?> internalFuture() {
- return fut;
- }
-
- /** {@inheritDoc} */
- @Override public void deployClass(Class<?> depCls) {
- this.depCls = depCls;
- }
-
- /** {@inheritDoc} */
- @Override public void updater(Updater<K, V> updater) {
- A.notNull(updater, "updater");
-
- this.updater = updater;
- }
-
- /** {@inheritDoc} */
- @Override public boolean allowOverwrite() {
- return updater != ISOLATED_UPDATER;
- }
-
- /** {@inheritDoc} */
- @Override public void allowOverwrite(boolean allow) {
- if (allow == allowOverwrite())
- return;
-
- ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
-
- if (node == null)
- throw new IgniteException("Failed to get node for cache: " + cacheName);
-
- updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
- }
-
- /** {@inheritDoc} */
- @Override public boolean skipStore() {
- return skipStore;
- }
-
- /** {@inheritDoc} */
- @Override public void skipStore(boolean skipStore) {
- this.skipStore = skipStore;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public String cacheName() {
- return cacheName;
- }
-
- /** {@inheritDoc} */
- @Override public int perNodeBufferSize() {
- return bufSize;
- }
-
- /** {@inheritDoc} */
- @Override public void perNodeBufferSize(int bufSize) {
- A.ensure(bufSize > 0, "bufSize > 0");
-
- this.bufSize = bufSize;
- }
-
- /** {@inheritDoc} */
- @Override public int perNodeParallelLoadOperations() {
- return parallelOps;
- }
-
- /** {@inheritDoc} */
- @Override public void perNodeParallelLoadOperations(int parallelOps) {
- this.parallelOps = parallelOps;
- }
-
- /** {@inheritDoc} */
- @Override public long autoFlushFrequency() {
- return autoFlushFreq;
- }
-
- /** {@inheritDoc} */
- @Override public void autoFlushFrequency(long autoFlushFreq) {
- A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
-
- long old = this.autoFlushFreq;
-
- if (autoFlushFreq != old) {
- this.autoFlushFreq = autoFlushFreq;
-
- if (autoFlushFreq != 0 && old == 0)
- flushQ.add(this);
- else if (autoFlushFreq == 0)
- flushQ.remove(this);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
- A.notNull(entries, "entries");
-
- return addData(entries.entrySet());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
- A.notEmpty(entries, "entries");
-
- enterBusy();
-
- try {
- GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
-
- resFut.listenAsync(rmvActiveFut);
-
- activeFuts.add(resFut);
-
- Collection<K> keys = null;
-
- if (entries.size() > 1) {
- keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
-
- for (Map.Entry<K, V> entry : entries)
- keys.add(entry.getKey());
- }
-
- load0(entries, resFut, keys, 0);
-
- return new IgniteFutureImpl<>(resFut);
- }
- catch (IgniteException e) {
- return new IgniteFinishedFutureImpl<>(ctx, e);
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
- A.notNull(entry, "entry");
-
- return addData(F.asList(entry));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(K key, V val) {
- A.notNull(key, "key");
-
- return addData(new Entry0<>(key, val));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> removeData(K key) {
- return addData(key, null);
- }
-
- /**
- * @param entries Entries.
- * @param resFut Result future.
- * @param activeKeys Active keys.
- * @param remaps Remaps count.
- */
- private void load0(
- Collection<? extends Map.Entry<K, V>> entries,
- final GridFutureAdapter<Object> resFut,
- @Nullable final Collection<K> activeKeys,
- final int remaps
- ) {
- assert entries != null;
-
- Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
-
- boolean initPda = ctx.deploy().enabled() && jobPda == null;
-
- for (Map.Entry<K, V> entry : entries) {
- List<ClusterNode> nodes;
-
- try {
- K key = entry.getKey();
-
- assert key != null;
-
- if (initPda) {
- jobPda = new DataLoaderPda(key, entry.getValue(), updater);
-
- initPda = false;
- }
-
- nodes = nodes(key);
- }
- catch (IgniteCheckedException e) {
- resFut.onDone(e);
-
- return;
- }
-
- if (F.isEmpty(nodes)) {
- resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
- "(no nodes with cache found in topology) [infos=" + entries.size() +
- ", cacheName=" + cacheName + ']'));
-
- return;
- }
-
- for (ClusterNode node : nodes) {
- Collection<Map.Entry<K, V>> col = mappings.get(node);
-
- if (col == null)
- mappings.put(node, col = new ArrayList<>());
-
- col.add(entry);
- }
- }
-
- for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
- final UUID nodeId = e.getKey().id();
-
- Buffer buf = bufMappings.get(nodeId);
-
- if (buf == null) {
- Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
-
- if (old != null)
- buf = old;
- }
-
- final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
-
- IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> t) {
- try {
- t.get();
-
- if (activeKeys != null) {
- for (Map.Entry<K, V> e : entriesForNode)
- activeKeys.remove(e.getKey());
-
- if (activeKeys.isEmpty())
- resFut.onDone();
- }
- else {
- assert entriesForNode.size() == 1;
-
- // That has been a single key,
- // so complete result future right away.
- resFut.onDone();
- }
- }
- catch (IgniteCheckedException e1) {
- if (log.isDebugEnabled())
- log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
-
- if (cancelled) {
- resFut.onDone(new IgniteCheckedException("Data loader has been cancelled: " +
- IgniteDataLoaderImpl.this, e1));
- }
- else if (remaps + 1 > maxRemapCnt) {
- resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
- + remaps), e1);
- }
- else
- load0(entriesForNode, resFut, activeKeys, remaps + 1);
- }
- }
- };
-
- GridFutureAdapter<?> f;
-
- try {
- f = buf.update(entriesForNode, lsnr);
- }
- catch (IgniteInterruptedCheckedException e1) {
- resFut.onDone(e1);
-
- return;
- }
-
- if (ctx.discovery().node(nodeId) == null) {
- if (bufMappings.remove(nodeId, buf))
- buf.onNodeLeft();
-
- if (f != null)
- f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
- "(node has left): " + nodeId));
- }
- }
- }
-
- /**
- * @param key Key to map.
- * @return Nodes to send requests to.
- * @throws IgniteCheckedException If failed.
- */
- private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
- GridAffinityProcessor aff = ctx.affinity();
-
- return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
- Collections.singletonList(aff.mapKeyToNode(cacheName, key));
- }
-
- /**
- * Performs flush.
- *
- * @throws IgniteCheckedException If failed.
- */
- private void doFlush() throws IgniteCheckedException {
- lastFlushTime = U.currentTimeMillis();
-
- List<IgniteInternalFuture> activeFuts0 = null;
-
- int doneCnt = 0;
-
- for (IgniteInternalFuture<?> f : activeFuts) {
- if (!f.isDone()) {
- if (activeFuts0 == null)
- activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
-
- activeFuts0.add(f);
- }
- else {
- f.get();
-
- doneCnt++;
- }
- }
-
- if (activeFuts0 == null || activeFuts0.isEmpty())
- return;
-
- while (true) {
- Queue<IgniteInternalFuture<?>> q = null;
-
- for (Buffer buf : bufMappings.values()) {
- IgniteInternalFuture<?> flushFut = buf.flush();
-
- if (flushFut != null) {
- if (q == null)
- q = new ArrayDeque<>(bufMappings.size() * 2);
-
- q.add(flushFut);
- }
- }
-
- if (q != null) {
- assert !q.isEmpty();
-
- boolean err = false;
-
- for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
- try {
- fut.get();
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to flush buffer: " + e);
-
- err = true;
- }
- }
-
- if (err)
- // Remaps needed - flush buffers.
- continue;
- }
-
- doneCnt = 0;
-
- for (int i = 0; i < activeFuts0.size(); i++) {
- IgniteInternalFuture f = activeFuts0.get(i);
-
- if (f == null)
- doneCnt++;
- else if (f.isDone()) {
- f.get();
-
- doneCnt++;
-
- activeFuts0.set(i, null);
- }
- else
- break;
- }
-
- if (doneCnt == activeFuts0.size())
- return;
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- @Override public void flush() throws IgniteException {
- enterBusy();
-
- try {
- doFlush();
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * Flushes every internal buffer if buffer was flushed before passed in
- * threshold.
- * <p>
- * Does not wait for result and does not fail on errors assuming that this method
- * should be called periodically.
- */
- @Override public void tryFlush() throws IgniteInterruptedException {
- if (!busyLock.enterBusy())
- return;
-
- try {
- for (Buffer buf : bufMappings.values())
- buf.flush();
-
- lastFlushTime = U.currentTimeMillis();
- }
- catch (IgniteInterruptedCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * @param cancel {@code True} to close with cancellation.
- * @throws IgniteException If failed.
- */
- @Override public void close(boolean cancel) throws IgniteException {
- try {
- closeEx(cancel);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- /**
- * @param cancel {@code True} to close with cancellation.
- * @throws IgniteCheckedException If failed.
- */
- public void closeEx(boolean cancel) throws IgniteCheckedException {
- if (!closed.compareAndSet(false, true))
- return;
-
- busyLock.block();
-
- if (log.isDebugEnabled())
- log.debug("Closing data loader [ldr=" + this + ", cancel=" + cancel + ']');
-
- IgniteCheckedException e = null;
-
- try {
- // Assuming that no methods are called on this loader after this method is called.
- if (cancel) {
- cancelled = true;
-
- for (Buffer buf : bufMappings.values())
- buf.cancelAll();
- }
- else
- doFlush();
-
- ctx.event().removeLocalEventListener(discoLsnr);
-
- ctx.io().removeMessageListener(topic);
- }
- catch (IgniteCheckedException e0) {
- e = e0;
- }
-
- fut.onDone(null, e);
-
- if (e != null)
- throw e;
- }
-
- /**
- * @return {@code true} If the loader is closed.
- */
- boolean isClosed() {
- return fut.isDone();
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteException {
- close(false);
- }
-
- /**
- * @return Max remap count.
- */
- public int maxRemapCount() {
- return maxRemapCnt;
- }
-
- /**
- * @param maxRemapCnt New max remap count.
- */
- public void maxRemapCount(int maxRemapCnt) {
- this.maxRemapCnt = maxRemapCnt;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteDataLoaderImpl.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public long getDelay(TimeUnit unit) {
- return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
- }
-
- /**
- * @return Next flush time.
- */
- private long nextFlushTime() {
- return lastFlushTime + autoFlushFreq;
- }
-
- /** {@inheritDoc} */
- @Override public int compareTo(Delayed o) {
- return nextFlushTime() > ((IgniteDataLoaderImpl)o).nextFlushTime() ? 1 : -1;
- }
-
- /**
- *
- */
- private class Buffer {
- /** Node. */
- private final ClusterNode node;
-
- /** Active futures. */
- private final Collection<IgniteInternalFuture<Object>> locFuts;
-
- /** Buffered entries. */
- private List<Map.Entry<K, V>> entries;
-
- /** */
- @GridToStringExclude
- private GridFutureAdapter<Object> curFut;
-
- /** Local node flag. */
- private final boolean isLocNode;
-
- /** ID generator. */
- private final AtomicLong idGen = new AtomicLong();
-
- /** Active futures. */
- private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
-
- /** */
- private final Semaphore sem;
-
- /** Closure to signal on task finish. */
- @GridToStringExclude
- private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> t) {
- signalTaskFinished(t);
- }
- };
-
- /**
- * @param node Node.
- */
- Buffer(ClusterNode node) {
- assert node != null;
-
- this.node = node;
-
- locFuts = new GridConcurrentHashSet<>();
- reqs = new ConcurrentHashMap8<>();
-
- // Cache local node flag.
- isLocNode = node.equals(ctx.discovery().localNode());
-
- entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
-
- sem = new Semaphore(parallelOps);
- }
-
- /**
- * @param newEntries Infos.
- * @param lsnr Listener for the operation future.
- * @throws IgniteInterruptedCheckedException If failed.
- * @return Future for operation.
- */
- @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
- IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
- List<Map.Entry<K, V>> entries0 = null;
- GridFutureAdapter<Object> curFut0;
-
- synchronized (this) {
- curFut0 = curFut;
-
- curFut0.listenAsync(lsnr);
-
- for (Map.Entry<K, V> entry : newEntries)
- entries.add(entry);
-
- if (entries.size() >= bufSize) {
- entries0 = entries;
-
- entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
- }
- }
-
- if (entries0 != null) {
- submit(entries0, curFut0);
-
- if (cancelled)
- curFut0.onDone(new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataLoaderImpl.this));
- }
-
- return curFut0;
- }
-
- /**
- * @return Fresh collection with some space for outgrowth.
- */
- private List<Map.Entry<K, V>> newEntries() {
- return new ArrayList<>((int)(bufSize * 1.2));
- }
-
- /**
- * @return Future if any submitted.
- *
- * @throws IgniteInterruptedCheckedException If thread has been interrupted.
- */
- @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
- List<Map.Entry<K, V>> entries0 = null;
- GridFutureAdapter<Object> curFut0 = null;
-
- synchronized (this) {
- if (!entries.isEmpty()) {
- entries0 = entries;
- curFut0 = curFut;
-
- entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
- }
- }
-
- if (entries0 != null)
- submit(entries0, curFut0);
-
- // Create compound future for this flush.
- GridCompoundFuture<Object, Object> res = null;
-
- for (IgniteInternalFuture<Object> f : locFuts) {
- if (res == null)
- res = new GridCompoundFuture<>(ctx);
-
- res.add(f);
- }
-
- for (IgniteInternalFuture<Object> f : reqs.values()) {
- if (res == null)
- res = new GridCompoundFuture<>(ctx);
-
- res.add(f);
- }
-
- if (res != null)
- res.markInitialized();
-
- return res;
- }
-
- /**
- * Increments active tasks count.
- *
- * @throws IgniteInterruptedCheckedException If thread has been interrupted.
- */
- private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
- U.acquire(sem);
- }
-
- /**
- * @param f Future that finished.
- */
- private void signalTaskFinished(IgniteInternalFuture<Object> f) {
- assert f != null;
-
- sem.release();
- }
-
- /**
- * @param entries Entries to submit.
- * @param curFut Current future.
- * @throws IgniteInterruptedCheckedException If interrupted.
- */
- private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
- throws IgniteInterruptedCheckedException {
- assert entries != null;
- assert !entries.isEmpty();
- assert curFut != null;
-
- incrementActiveTasks();
-
- IgniteInternalFuture<Object> fut;
-
- if (isLocNode) {
- fut = ctx.closure().callLocalSafe(
- new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
-
- locFuts.add(fut);
-
- fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> t) {
- try {
- boolean rmv = locFuts.remove(t);
-
- assert rmv;
-
- curFut.onDone(t.get());
- }
- catch (IgniteCheckedException e) {
- curFut.onDone(e);
- }
- }
- });
- }
- else {
- byte[] entriesBytes;
-
- try {
- if (compact) {
- entriesBytes = ctx.config().getMarshaller()
- .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
- }
- else
- entriesBytes = ctx.config().getMarshaller().marshal(entries);
-
- if (updaterBytes == null) {
- assert updater != null;
-
- updaterBytes = ctx.config().getMarshaller().marshal(updater);
- }
-
- if (topicBytes == null)
- topicBytes = ctx.config().getMarshaller().marshal(topic);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal (request will not be sent).", e);
-
- return;
- }
-
- GridDeployment dep = null;
- GridPeerDeployAware jobPda0 = null;
-
- if (ctx.deploy().enabled()) {
- try {
- jobPda0 = jobPda;
-
- assert jobPda0 != null;
-
- dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
-
- GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-
- if (cache != null)
- cache.context().deploy().onEnter();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
-
- return;
- }
-
- if (dep == null)
- U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
- }
-
- long reqId = idGen.incrementAndGet();
-
- fut = curFut;
-
- reqs.put(reqId, (GridFutureAdapter<Object>)fut);
-
- GridDataLoadRequest req = new GridDataLoadRequest(
- reqId,
- topicBytes,
- cacheName,
- updaterBytes,
- entriesBytes,
- true,
- skipStore,
- dep != null ? dep.deployMode() : null,
- dep != null ? jobPda0.deployClass().getName() : null,
- dep != null ? dep.userVersion() : null,
- dep != null ? dep.participants() : null,
- dep != null ? dep.classLoaderId() : null,
- dep == null);
-
- try {
- ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
-
- if (log.isDebugEnabled())
- log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
- }
- catch (IgniteCheckedException e) {
- if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
- ((GridFutureAdapter<Object>)fut).onDone(e);
- else
- ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
- "request (node has left): " + node.id()));
- }
- }
- }
-
- /**
- *
- */
- void onNodeLeft() {
- assert !isLocNode;
- assert bufMappings.get(node.id()) != this;
-
- if (log.isDebugEnabled())
- log.debug("Forcibly completing futures (node has left): " + node.id());
-
- Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
- "(node has left): " + node.id());
-
- for (GridFutureAdapter<Object> f : reqs.values())
- f.onDone(e);
-
- // Make sure to complete current future.
- GridFutureAdapter<Object> curFut0;
-
- synchronized (this) {
- curFut0 = curFut;
- }
-
- curFut0.onDone(e);
- }
-
- /**
- * @param res Response.
- */
- void onResponse(GridDataLoadResponse res) {
- if (log.isDebugEnabled())
- log.debug("Received data load response: " + res);
-
- GridFutureAdapter<?> f = reqs.remove(res.requestId());
-
- if (f == null) {
- if (log.isDebugEnabled())
- log.debug("Future for request has not been found: " + res.requestId());
-
- return;
- }
-
- Throwable err = null;
-
- byte[] errBytes = res.errorBytes();
-
- if (errBytes != null) {
- try {
- GridPeerDeployAware jobPda0 = jobPda;
-
- err = ctx.config().getMarshaller().unmarshal(
- errBytes,
- jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
- }
- catch (IgniteCheckedException e) {
- f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
-
- return;
- }
- }
-
- f.onDone(null, err);
-
- if (log.isDebugEnabled())
- log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
- }
-
- /**
- *
- */
- void cancelAll() {
- IgniteCheckedException err = new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataLoaderImpl.this);
-
- for (IgniteInternalFuture<?> f : locFuts) {
- try {
- f.cancel();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to cancel mini-future.", e);
- }
- }
-
- for (GridFutureAdapter<?> f : reqs.values())
- f.onDone(err);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- int size;
-
- synchronized (this) {
- size = entries.size();
- }
-
- return S.toString(Buffer.class, this,
- "entriesCnt", size,
- "locFutsSize", locFuts.size(),
- "reqsSize", reqs.size());
- }
- }
-
- /**
- * Data loader peer-deploy aware.
- */
- private class DataLoaderPda implements GridPeerDeployAware {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Deploy class. */
- private Class<?> cls;
-
- /** Class loader. */
- private ClassLoader ldr;
-
- /** Collection of objects to detect deploy class and class loader. */
- private Collection<Object> objs;
-
- /**
- * Constructs data loader peer-deploy aware.
- *
- * @param objs Collection of objects to detect deploy class and class loader.
- */
- private DataLoaderPda(Object... objs) {
- this.objs = Arrays.asList(objs);
- }
-
- /** {@inheritDoc} */
- @Override public Class<?> deployClass() {
- if (cls == null) {
- Class<?> cls0 = null;
-
- if (depCls != null)
- cls0 = depCls;
- else {
- for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
- Object o = it.next();
-
- if (o != null)
- cls0 = U.detectClass(o);
- }
-
- if (cls0 == null || U.isJdk(cls0))
- cls0 = IgniteDataLoaderImpl.class;
- }
-
- assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
-
- cls = cls0;
- }
-
- return cls;
- }
-
- /** {@inheritDoc} */
- @Override public ClassLoader classLoader() {
- if (ldr == null) {
- ClassLoader ldr0 = deployClass().getClassLoader();
-
- // Safety.
- if (ldr0 == null)
- ldr0 = U.gridClassLoader();
-
- assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
-
- ldr = ldr0;
- }
-
- return ldr;
- }
- }
-
- /**
- * Entry.
- */
- private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private K key;
-
- /** */
- private V val;
-
- /**
- * @param key Key.
- * @param val Value.
- */
- private Entry0(K key, @Nullable V val) {
- assert key != null;
-
- this.key = key;
- this.val = val;
- }
-
- /**
- * For {@link Externalizable}.
- */
- @SuppressWarnings("UnusedDeclaration")
- public Entry0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public K getKey() {
- return key;
- }
-
- /** {@inheritDoc} */
- @Override public V getValue() {
- return val;
- }
-
- /** {@inheritDoc} */
- @Override public V setValue(V val) {
- V old = this.val;
-
- this.val = val;
-
- return old;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(key);
- out.writeObject(val);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- key = (K)in.readObject();
- val = (V)in.readObject();
- }
- }
-
- /**
- * Wrapper list with special compact serialization of map entries.
- */
- private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Wrapped delegate. */
- private Collection<Map.Entry<K, V>> delegate;
-
- /** Optional portable processor for converting values. */
- private GridPortableProcessor portable;
-
- /**
- * @param delegate Delegate.
- * @param portable Portable processor.
- */
- private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
- this.delegate = delegate;
- this.portable = portable;
- }
-
- /**
- * For {@link Externalizable}.
- */
- public Entries0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Entry<K, V>> iterator() {
- return delegate.iterator();
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return delegate.size();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(delegate.size());
-
- boolean portableEnabled = portable != null;
-
- for (Map.Entry<K, V> entry : delegate) {
- if (portableEnabled) {
- out.writeObject(portable.marshalToPortable(entry.getKey()));
- out.writeObject(portable.marshalToPortable(entry.getValue()));
- }
- else {
- out.writeObject(entry.getKey());
- out.writeObject(entry.getValue());
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- int sz = in.readInt();
-
- delegate = new ArrayList<>(sz);
-
- for (int i = 0; i < sz; i++) {
- Object k = in.readObject();
- Object v = in.readObject();
-
- delegate.add(new Entry0<>((K)k, (V)v));
- }
- }
- }
-
- /**
- * Isolated updater which only loads entry initial value.
- */
- private static class IsolatedUpdater<K, V> implements Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
-
- GridCacheAdapter<K, V> internalCache = proxy.context().cache();
-
- if (internalCache.isNear())
- internalCache = internalCache.context().near().dht();
-
- GridCacheContext<K, V> cctx = internalCache.context();
-
- long topVer = cctx.affinity().affinityTopologyVersion();
-
- GridCacheVersion ver = cctx.versions().next(topVer);
-
- boolean portable = cctx.portableEnabled();
-
- for (Map.Entry<K, V> e : entries) {
- try {
- K key = e.getKey();
- V val = e.getValue();
-
- if (portable) {
- key = (K)cctx.marshalToPortable(key);
- val = (V)cctx.marshalToPortable(val);
- }
-
- GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer);
-
- entry.unswap(true, false);
-
- entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer,
- GridDrType.DR_LOAD);
-
- cctx.evicts().touch(entry, topVer);
- }
- catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
- // No-op.
- }
- catch (IgniteCheckedException ex) {
- IgniteLogger log = cache.unwrap(Ignite.class).log();
-
- U.error(log, "Failed to set initial value for cache entry: " + e, ex);
- }
- }
- }
- }
-}
[3/3] incubator-ignite git commit: # ignite-394: IgniteDataLoader ->
IgniteDataStreamer.java + impl
Posted by sb...@apache.org.
# ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9b33b651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9b33b651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9b33b651
Branch: refs/heads/ignite-394
Commit: 9b33b6510f5b82c30c8e75a66eb328b00bc425e4
Parents: 6909cc4
Author: Artem Shutak <as...@gridgain.com>
Authored: Tue Mar 3 21:40:19 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Tue Mar 3 21:40:19 2015 +0300
----------------------------------------------------------------------
.../datagrid/CacheDataLoaderExample.java | 6 +-
.../datagrid/CachePopularNumbersExample.java | 4 +-
.../src/main/java/org/apache/ignite/Ignite.java | 6 +-
.../org/apache/ignite/IgniteDataLoader.java | 379 -----
.../org/apache/ignite/IgniteDataStreamer.java | 379 +++++
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 10 +-
.../GridDistributedCacheAdapter.java | 4 +-
.../dataload/GridDataLoadCacheUpdaters.java | 18 +-
.../dataload/GridDataLoadUpdateJob.java | 4 +-
.../dataload/GridDataLoaderFuture.java | 4 +-
.../dataload/GridDataLoaderProcessor.java | 16 +-
.../dataload/IgniteDataLoaderImpl.java | 1453 ------------------
.../dataload/IgniteDataStreamerImpl.java | 1453 ++++++++++++++++++
.../dr/GridDrDataLoadCacheUpdater.java | 2 +-
.../processors/igfs/IgfsDataManager.java | 10 +-
...iteTxConsistencyRestartAbstractSelfTest.java | 2 +-
...idCachePartitionedHitsAndMissesSelfTest.java | 4 +-
.../GridCacheLruNearEvictionPolicySelfTest.java | 2 +-
...heNearOnlyLruNearEvictionPolicySelfTest.java | 2 +-
.../dataload/GridDataLoaderImplSelfTest.java | 5 +-
.../dataload/GridDataLoaderPerformanceTest.java | 2 +-
.../GridDataLoaderProcessorSelfTest.java | 27 +-
.../loadtests/colocation/GridTestMain.java | 2 +-
.../loadtests/discovery/GridGcTimeoutTest.java | 2 +-
.../mapper/GridContinuousMapperLoadTest1.java | 2 +-
.../mapper/GridContinuousMapperLoadTest2.java | 2 +-
.../ignite/testframework/junits/IgniteMock.java | 4 +-
.../scala/org/apache/ignite/scalar/scalar.scala | 8 +-
.../org/apache/ignite/IgniteSpringBean.java | 2 +-
.../cache/IgniteSqlQueryBenchmark.java | 2 +-
.../cache/IgniteSqlQueryJoinBenchmark.java | 2 +-
32 files changed, 1909 insertions(+), 1911 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
index 57b0cd2..4cdbfd4 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
@@ -21,8 +21,8 @@ import org.apache.ignite.*;
import org.apache.ignite.examples.*;
/**
- * Demonstrates how cache can be populated with data utilizing {@link IgniteDataLoader} API.
- * {@link IgniteDataLoader} is a lot more efficient to use than standard
+ * Demonstrates how cache can be populated with data utilizing {@link IgniteDataStreamer} API.
+ * {@link IgniteDataStreamer} is a lot more efficient to use than standard
* {@code put(...)} operation as it properly buffers cache requests
* together and properly manages load on remote nodes.
* <p>
@@ -63,7 +63,7 @@ public class CacheDataLoaderExample {
long start = System.currentTimeMillis();
- try (IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) {
+ try (IgniteDataStreamer<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) {
// Configure loader.
ldr.perNodeBufferSize(1024);
ldr.perNodeParallelLoadOperations(8);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
index 0f71681..1fc737b 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
@@ -92,7 +92,7 @@ public class CachePopularNumbersExample {
* @throws IgniteException If failed.
*/
private static void streamData(final Ignite ignite) throws IgniteException {
- try (IgniteDataLoader<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) {
+ try (IgniteDataStreamer<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) {
// Set larger per-node buffer size since our state is relatively small.
ldr.perNodeBufferSize(2048);
@@ -140,7 +140,7 @@ public class CachePopularNumbersExample {
/**
* Increments value for key.
*/
- private static class IncrementingUpdater implements IgniteDataLoader.Updater<Integer, Long> {
+ private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> {
/** */
private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() {
@Override public Void process(MutableEntry<Integer, Long> e, Object... args) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 8851d8f..44d4ba9 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -42,7 +42,7 @@ import java.util.concurrent.*;
* In addition to {@link ClusterGroup} functionality, from here you can get the following:
* <ul>
* <li>{@link org.apache.ignite.cache.GridCache} - functionality for in-memory distributed cache.</li>
- * <li>{@link IgniteDataLoader} - functionality for loading data large amounts of data into cache.</li>
+ * <li>{@link IgniteDataStreamer} - functionality for loading data large amounts of data into cache.</li>
* <li>{@link IgniteFs} - functionality for distributed Hadoop-compliant in-memory file system and map-reduce.</li>
* <li>{@link IgniteStreamer} - functionality for streaming events workflow with queries and indexes into rolling windows.</li>
* <li>{@link IgniteScheduler} - functionality for scheduling jobs using UNIX Cron syntax.</li>
@@ -205,12 +205,12 @@ public interface Ignite extends AutoCloseable {
/**
* Gets a new instance of data loader associated with given cache name. Data loader
* is responsible for loading external data into in-memory data grid. For more information
- * refer to {@link IgniteDataLoader} documentation.
+ * refer to {@link IgniteDataStreamer} documentation.
*
* @param cacheName Cache name ({@code null} for default cache).
* @return Data loader.
*/
- public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName);
+ public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName);
/**
* Gets an instance of IGFS - Ignite In-Memory File System, if one is not
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
deleted file mode 100644
index 3cff287..0000000
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite;
-
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Data loader is responsible for loading external data into cache. It achieves it by
- * properly buffering updates and properly mapping keys to nodes responsible for the data
- * to make sure that there is the least amount of data movement possible and optimal
- * network and memory utilization.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- * <p>
- * Also note that {@code GridDataLoader} is not the only way to load data into cache.
- * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
- * method to load data from underlying data store. You can also use standard
- * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
- * likely will not perform as well as this class for loading data. And finally,
- * data can be loaded from underlying data store on demand, whenever it is accessed -
- * for this no explicit data loading step is needed.
- * <p>
- * {@code IgniteDataLoader} supports the following configuration properties:
- * <ul>
- * <li>
- * {@link #perNodeBufferSize(int)} - when entries are added to data loader via
- * {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right
- * away and are buffered internally for better performance and network utilization.
- * This setting controls the size of internal per-node buffer before buffered data
- * is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE}
- * value.
- * </li>
- * <li>
- * {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added
- * to the data loader via {@link #addData(Object, Object)} method faster than it can
- * be put in cache. In this case, new buffered load messages are sent to remote nodes
- * before responses from previous ones are received. This could cause unlimited heap
- * memory utilization growth on local and remote nodes. To control memory utilization,
- * this setting limits maximum allowed number of parallel buffered load messages that
- * are being processed on remote nodes. If this number is exceeded, then
- * {@link #addData(Object, Object)} method will block to control memory utilization.
- * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
- * </li>
- * <li>
- * {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
- * this is the time after which the loader will make an attempt to submit all data
- * added so far to remote nodes. Note that there is no guarantee that data will be
- * delivered after this concrete attempt (e.g., it can fail when topology is
- * changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}).
- * </li>
- * <li>
- * {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent
- * updates and allow data loader choose most optimal concurrent implementation.
- * </li>
- * <li>
- * {@link #updater(IgniteDataLoader.Updater)} - defines how cache will be updated with loaded entries.
- * It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
- * </li>
- * <li>
- * {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes
- * loaded by a data loader must be class-loadable from the same class-loader.
- * Ignite will make the best effort to detect the most suitable class-loader
- * for data loading. However, in complex cases, where compound or deeply nested
- * class-loaders are used, it is best to specify a deploy class which can be any
- * class loaded by the class-loader for given data.
- * </li>
- * </ul>
- */
-public interface IgniteDataLoader<K, V> extends AutoCloseable {
- /** Default max concurrent put operations count. */
- public static final int DFLT_MAX_PARALLEL_OPS = 16;
-
- /** Default per node buffer size. */
- public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
-
- /**
- * Name of cache to load data to.
- *
- * @return Cache name or {@code null} for default cache.
- */
- public String cacheName();
-
- /**
- * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache.
- * Default is {@code true}.
- *
- * @return Flag value.
- */
- public boolean allowOverwrite();
-
- /**
- * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache.
- * Should not be used when custom cache updater set using {@link #updater(IgniteDataLoader.Updater)} method.
- * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store.
- *
- * @param allowOverwrite Flag value.
- * @throws IgniteException If failed.
- */
- public void allowOverwrite(boolean allowOverwrite) throws IgniteException;
-
- /**
- * Gets flag indicating that write-through behavior should be disabled for data loading.
- * Default is {@code false}.
- *
- * @return Skip store flag.
- */
- public boolean skipStore();
-
- /**
- * Sets flag indicating that write-through behavior should be disabled for data loading.
- * Default is {@code false}.
- *
- * @param skipStore Skip store flag.
- */
- public void skipStore(boolean skipStore);
-
- /**
- * Gets size of per node key-value pairs buffer.
- *
- * @return Per node buffer size.
- */
- public int perNodeBufferSize();
-
- /**
- * Sets size of per node key-value pairs buffer.
- * <p>
- * This method should be called prior to {@link #addData(Object, Object)} call.
- * <p>
- * If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}.
- *
- * @param bufSize Per node buffer size.
- */
- public void perNodeBufferSize(int bufSize);
-
- /**
- * Gets maximum number of parallel load operations for a single node.
- *
- * @return Maximum number of parallel load operations for a single node.
- */
- public int perNodeParallelLoadOperations();
-
- /**
- * Sets maximum number of parallel load operations for a single node.
- * <p>
- * This method should be called prior to {@link #addData(Object, Object)} call.
- * <p>
- * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
- *
- * @param parallelOps Maximum number of parallel load operations for a single node.
- */
- public void perNodeParallelLoadOperations(int parallelOps);
-
- /**
- * Gets automatic flush frequency. Essentially, this is the time after which the
- * loader will make an attempt to submit all data added so far to remote nodes.
- * Note that there is no guarantee that data will be delivered after this concrete
- * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
- * <p>
- * If set to {@code 0}, automatic flush is disabled.
- * <p>
- * Automatic flush is disabled by default (default value is {@code 0}).
- *
- * @return Flush frequency or {@code 0} if automatic flush is disabled.
- * @see #flush()
- */
- public long autoFlushFrequency();
-
- /**
- * Sets automatic flush frequency. Essentially, this is the time after which the
- * loader will make an attempt to submit all data added so far to remote nodes.
- * Note that there is no guarantee that data will be delivered after this concrete
- * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
- * <p>
- * If set to {@code 0}, automatic flush is disabled.
- * <p>
- * Automatic flush is disabled by default (default value is {@code 0}).
- *
- * @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush.
- * @see #flush()
- */
- public void autoFlushFrequency(long autoFlushFreq);
-
- /**
- * Gets future for this loading process. This future completes whenever method
- * {@link #close(boolean)} completes. By attaching listeners to this future
- * it is possible to get asynchronous notifications for completion of this
- * loading process.
- *
- * @return Future for this loading process.
- */
- public IgniteFuture<?> future();
-
- /**
- * Optional deploy class for peer deployment. All classes loaded by a data loader
- * must be class-loadable from the same class-loader. Ignite will make the best
- * effort to detect the most suitable class-loader for data loading. However,
- * in complex cases, where compound or deeply nested class-loaders are used,
- * it is best to specify a deploy class which can be any class loaded by
- * the class-loader for given data.
- *
- * @param depCls Any class loaded by the class-loader for given data.
- */
- public void deployClass(Class<?> depCls);
-
- /**
- * Sets custom cache updater to this data loader.
- *
- * @param updater Cache updater.
- */
- public void updater(Updater<K, V> updater);
-
- /**
- * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}.
- *
- * @param key Key.
- * @return Future fo this operation.
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- */
- public IgniteFuture<?> removeData(K key) throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
- /**
- * Adds data for loading on remote node. This method can be called from multiple
- * threads in parallel to speed up loading if needed.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- *
- * @param key Key.
- * @param val Value or {@code null} if respective entry must be removed from cache.
- * @return Future fo this operation.
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- */
- public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException,
- IllegalStateException;
-
- /**
- * Adds data for loading on remote node. This method can be called from multiple
- * threads in parallel to speed up loading if needed.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- *
- * @param entry Entry.
- * @return Future fo this operation.
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- */
- public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
- IllegalStateException;
-
- /**
- * Adds data for loading on remote node. This method can be called from multiple
- * threads in parallel to speed up loading if needed.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- *
- * @param entries Collection of entries to be loaded.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- * @return Future for this load operation.
- */
- public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException;
-
- /**
- * Adds data for loading on remote node. This method can be called from multiple
- * threads in parallel to speed up loading if needed.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- *
- * @param entries Map to be loaded.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- * @return Future for this load operation.
- */
- public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException;
-
- /**
- * Loads any remaining data, but doesn't close the loader. Data can be still added after
- * flush is finished. This method blocks and doesn't allow to add any data until all data
- * is loaded.
- * <p>
- * If another thread is already performing flush, this method will block, wait for
- * another thread to complete flush and exit. If you don't want to wait in this case,
- * use {@link #tryFlush()} method.
- *
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- * @see #tryFlush()
- */
- public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
- /**
- * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush},
- * with the difference that it won't wait and will exit immediately.
- *
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- * @see #flush()
- */
- public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
- /**
- * Loads any remaining data and closes this loader.
- *
- * @param cancel {@code True} to cancel ongoing loading operations.
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- */
- public void close(boolean cancel) throws IgniteException, IgniteInterruptedException;
-
- /**
- * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method.
- * <p>
- * The method is invoked automatically on objects managed by the
- * {@code try-with-resources} statement.
- *
- * @throws IgniteException If failed to close data loader.
- * @throws IgniteInterruptedException If thread has been interrupted.
- */
- @Override public void close() throws IgniteException, IgniteInterruptedException;
-
- /**
- * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#allowOverwrite(boolean)}
- * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
- * performance custom user-defined implementation may help.
- * <p>
- * Data loader can be configured to use custom implementation of updater instead of default one using
- * {@link IgniteDataLoader#updater(IgniteDataLoader.Updater)} method.
- */
- interface Updater<K, V> extends Serializable {
- /**
- * Updates cache with batch of entries.
- *
- * @param cache Cache.
- * @param entries Collection of entries.
- * @throws IgniteException If failed.
- */
- public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
new file mode 100644
index 0000000..c48d61a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Data loader is responsible for loading external data into cache. It achieves it by
+ * properly buffering updates and properly mapping keys to nodes responsible for the data
+ * to make sure that there is the least amount of data movement possible and optimal
+ * network and memory utilization.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ * <p>
+ * Also note that {@code GridDataLoader} is not the only way to load data into cache.
+ * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
+ * method to load data from underlying data store. You can also use standard
+ * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
+ * likely will not perform as well as this class for loading data. And finally,
+ * data can be loaded from underlying data store on demand, whenever it is accessed -
+ * for this no explicit data loading step is needed.
+ * <p>
+ * {@code IgniteDataLoader} supports the following configuration properties:
+ * <ul>
+ * <li>
+ * {@link #perNodeBufferSize(int)} - when entries are added to data loader via
+ * {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right
+ * away and are buffered internally for better performance and network utilization.
+ * This setting controls the size of internal per-node buffer before buffered data
+ * is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE}
+ * value.
+ * </li>
+ * <li>
+ * {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added
+ * to the data loader via {@link #addData(Object, Object)} method faster than it can
+ * be put in cache. In this case, new buffered load messages are sent to remote nodes
+ * before responses from previous ones are received. This could cause unlimited heap
+ * memory utilization growth on local and remote nodes. To control memory utilization,
+ * this setting limits maximum allowed number of parallel buffered load messages that
+ * are being processed on remote nodes. If this number is exceeded, then
+ * {@link #addData(Object, Object)} method will block to control memory utilization.
+ * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
+ * </li>
+ * <li>
+ * {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
+ * this is the time after which the loader will make an attempt to submit all data
+ * added so far to remote nodes. Note that there is no guarantee that data will be
+ * delivered after this concrete attempt (e.g., it can fail when topology is
+ * changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}).
+ * </li>
+ * <li>
+ * {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent
+ * updates and allow data loader choose most optimal concurrent implementation.
+ * </li>
+ * <li>
+ * {@link #updater(IgniteDataStreamer.Updater)} - defines how cache will be updated with loaded entries.
+ * It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
+ * </li>
+ * <li>
+ * {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes
+ * loaded by a data loader must be class-loadable from the same class-loader.
+ * Ignite will make the best effort to detect the most suitable class-loader
+ * for data loading. However, in complex cases, where compound or deeply nested
+ * class-loaders are used, it is best to specify a deploy class which can be any
+ * class loaded by the class-loader for given data.
+ * </li>
+ * </ul>
+ */
+public interface IgniteDataStreamer<K, V> extends AutoCloseable {
+ /** Default max concurrent put operations count. */
+ public static final int DFLT_MAX_PARALLEL_OPS = 16;
+
+ /** Default per node buffer size. */
+ public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
+
+ /**
+ * Name of cache to load data to.
+ *
+ * @return Cache name or {@code null} for default cache.
+ */
+ public String cacheName();
+
+ /**
+ * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache.
+ * Default is {@code true}.
+ *
+ * @return Flag value.
+ */
+ public boolean allowOverwrite();
+
+ /**
+ * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache.
+ * Should not be used when custom cache updater set using {@link #updater(IgniteDataStreamer.Updater)} method.
+ * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store.
+ *
+ * @param allowOverwrite Flag value.
+ * @throws IgniteException If failed.
+ */
+ public void allowOverwrite(boolean allowOverwrite) throws IgniteException;
+
+ /**
+ * Gets flag indicating that write-through behavior should be disabled for data loading.
+ * Default is {@code false}.
+ *
+ * @return Skip store flag.
+ */
+ public boolean skipStore();
+
+ /**
+ * Sets flag indicating that write-through behavior should be disabled for data loading.
+ * Default is {@code false}.
+ *
+ * @param skipStore Skip store flag.
+ */
+ public void skipStore(boolean skipStore);
+
+ /**
+ * Gets size of per node key-value pairs buffer.
+ *
+ * @return Per node buffer size.
+ */
+ public int perNodeBufferSize();
+
+ /**
+ * Sets size of per node key-value pairs buffer.
+ * <p>
+ * This method should be called prior to {@link #addData(Object, Object)} call.
+ * <p>
+ * If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}.
+ *
+ * @param bufSize Per node buffer size.
+ */
+ public void perNodeBufferSize(int bufSize);
+
+ /**
+ * Gets maximum number of parallel load operations for a single node.
+ *
+ * @return Maximum number of parallel load operations for a single node.
+ */
+ public int perNodeParallelLoadOperations();
+
+ /**
+ * Sets maximum number of parallel load operations for a single node.
+ * <p>
+ * This method should be called prior to {@link #addData(Object, Object)} call.
+ * <p>
+ * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
+ *
+ * @param parallelOps Maximum number of parallel load operations for a single node.
+ */
+ public void perNodeParallelLoadOperations(int parallelOps);
+
+ /**
+ * Gets automatic flush frequency. Essentially, this is the time after which the
+ * loader will make an attempt to submit all data added so far to remote nodes.
+ * Note that there is no guarantee that data will be delivered after this concrete
+ * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+ * <p>
+ * If set to {@code 0}, automatic flush is disabled.
+ * <p>
+ * Automatic flush is disabled by default (default value is {@code 0}).
+ *
+ * @return Flush frequency or {@code 0} if automatic flush is disabled.
+ * @see #flush()
+ */
+ public long autoFlushFrequency();
+
+ /**
+ * Sets automatic flush frequency. Essentially, this is the time after which the
+ * loader will make an attempt to submit all data added so far to remote nodes.
+ * Note that there is no guarantee that data will be delivered after this concrete
+ * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+ * <p>
+ * If set to {@code 0}, automatic flush is disabled.
+ * <p>
+ * Automatic flush is disabled by default (default value is {@code 0}).
+ *
+ * @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush.
+ * @see #flush()
+ */
+ public void autoFlushFrequency(long autoFlushFreq);
+
+ /**
+ * Gets future for this loading process. This future completes whenever method
+ * {@link #close(boolean)} completes. By attaching listeners to this future
+ * it is possible to get asynchronous notifications for completion of this
+ * loading process.
+ *
+ * @return Future for this loading process.
+ */
+ public IgniteFuture<?> future();
+
+ /**
+ * Optional deploy class for peer deployment. All classes loaded by a data loader
+ * must be class-loadable from the same class-loader. Ignite will make the best
+ * effort to detect the most suitable class-loader for data loading. However,
+ * in complex cases, where compound or deeply nested class-loaders are used,
+ * it is best to specify a deploy class which can be any class loaded by
+ * the class-loader for given data.
+ *
+ * @param depCls Any class loaded by the class-loader for given data.
+ */
+ public void deployClass(Class<?> depCls);
+
+ /**
+ * Sets custom cache updater to this data loader.
+ *
+ * @param updater Cache updater.
+ */
+ public void updater(Updater<K, V> updater);
+
+ /**
+ * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}.
+ *
+ * @param key Key.
+ * @return Future fo this operation.
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ */
+ public IgniteFuture<?> removeData(K key) throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+ /**
+ * Adds data for loading on remote node. This method can be called from multiple
+ * threads in parallel to speed up loading if needed.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ *
+ * @param key Key.
+ * @param val Value or {@code null} if respective entry must be removed from cache.
+ * @return Future fo this operation.
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ */
+ public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException,
+ IllegalStateException;
+
+ /**
+ * Adds data for loading on remote node. This method can be called from multiple
+ * threads in parallel to speed up loading if needed.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ *
+ * @param entry Entry.
+ * @return Future fo this operation.
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ */
+ public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
+ IllegalStateException;
+
+ /**
+ * Adds data for loading on remote node. This method can be called from multiple
+ * threads in parallel to speed up loading if needed.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ *
+ * @param entries Collection of entries to be loaded.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ * @return Future for this load operation.
+ */
+ public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException;
+
+ /**
+ * Adds data for loading on remote node. This method can be called from multiple
+ * threads in parallel to speed up loading if needed.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ *
+ * @param entries Map to be loaded.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ * @return Future for this load operation.
+ */
+ public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException;
+
+ /**
+ * Loads any remaining data, but doesn't close the loader. Data can be still added after
+ * flush is finished. This method blocks and doesn't allow to add any data until all data
+ * is loaded.
+ * <p>
+ * If another thread is already performing flush, this method will block, wait for
+ * another thread to complete flush and exit. If you don't want to wait in this case,
+ * use {@link #tryFlush()} method.
+ *
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ * @see #tryFlush()
+ */
+ public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+ /**
+ * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush},
+ * with the difference that it won't wait and will exit immediately.
+ *
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ * @see #flush()
+ */
+ public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+ /**
+ * Loads any remaining data and closes this loader.
+ *
+ * @param cancel {@code True} to cancel ongoing loading operations.
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ */
+ public void close(boolean cancel) throws IgniteException, IgniteInterruptedException;
+
+ /**
+ * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method.
+ * <p>
+ * The method is invoked automatically on objects managed by the
+ * {@code try-with-resources} statement.
+ *
+ * @throws IgniteException If failed to close data loader.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ */
+ @Override public void close() throws IgniteException, IgniteInterruptedException;
+
+ /**
+ * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataStreamer#allowOverwrite(boolean)}
+ * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
+ * performance custom user-defined implementation may help.
+ * <p>
+ * Data loader can be configured to use custom implementation of updater instead of default one using
+ * {@link IgniteDataStreamer#updater(IgniteDataStreamer.Updater)} method.
+ */
+ interface Updater<K, V> extends Serializable {
+ /**
+ * Updates cache with batch of entries.
+ *
+ * @param cache Cache.
+ * @param entries Collection of entries.
+ * @throws IgniteException If failed.
+ */
+ public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index f46d071..336f872 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2346,7 +2346,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** {@inheritDoc} */
- @Override public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+ @Override public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
guard();
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 12ea535..6ed5699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3877,7 +3877,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();
if (ctx.store().isLocalStore()) {
- IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
+ IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
@@ -4043,7 +4043,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
* @throws IgniteCheckedException If failed.
*/
private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException {
- try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
+ try (final IgniteDataStreamer<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
ldr.allowOverwrite(true);
ldr.skipStore(true);
@@ -4086,7 +4086,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();
if (ctx.store().isLocalStore()) {
- IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
+ IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
@@ -6134,7 +6134,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
final Collection<Map.Entry<K, V>> col;
/** */
- final IgniteDataLoaderImpl<K, V> ldr;
+ final IgniteDataStreamerImpl<K, V> ldr;
/** */
final ExpiryPolicy plc;
@@ -6145,7 +6145,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
* @param plc Optional expiry policy.
*/
private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p,
- IgniteDataLoaderImpl<K, V> ldr,
+ IgniteDataStreamerImpl<K, V> ldr,
@Nullable ExpiryPolicy plc) {
this.p = p;
this.ldr = ldr;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 00190d9..c99efc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -276,8 +276,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
else
dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;
- try (IgniteDataLoader<K, V> dataLdr = ignite.dataLoader(cacheName)) {
- ((IgniteDataLoaderImpl)dataLdr).maxRemapCount(0);
+ try (IgniteDataStreamer<K, V> dataLdr = ignite.dataLoader(cacheName)) {
+ ((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0);
dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
index e2e780b..78a7e62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
@@ -29,13 +29,13 @@ import java.util.*;
*/
public class GridDataLoadCacheUpdaters {
/** */
- private static final IgniteDataLoader.Updater INDIVIDUAL = new Individual();
+ private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
/** */
- private static final IgniteDataLoader.Updater BATCHED = new Batched();
+ private static final IgniteDataStreamer.Updater BATCHED = new Batched();
/** */
- private static final IgniteDataLoader.Updater BATCHED_SORTED = new BatchedSorted();
+ private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
/**
* Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
@@ -44,7 +44,7 @@ public class GridDataLoadCacheUpdaters {
*
* @return Single updater.
*/
- public static <K, V> IgniteDataLoader.Updater<K, V> individual() {
+ public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
return INDIVIDUAL;
}
@@ -55,7 +55,7 @@ public class GridDataLoadCacheUpdaters {
*
* @return Batched updater.
*/
- public static <K, V> IgniteDataLoader.Updater<K, V> batched() {
+ public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
return BATCHED;
}
@@ -66,7 +66,7 @@ public class GridDataLoadCacheUpdaters {
*
* @return Batched sorted updater.
*/
- public static <K extends Comparable<?>, V> IgniteDataLoader.Updater<K, V> batchedSorted() {
+ public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
return BATCHED_SORTED;
}
@@ -93,7 +93,7 @@ public class GridDataLoadCacheUpdaters {
/**
* Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
*/
- private static class Individual<K, V> implements IgniteDataLoader.Updater<K, V> {
+ private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;
@@ -120,7 +120,7 @@ public class GridDataLoadCacheUpdaters {
/**
* Batched updater. Updates cache using batch operations thus is dead lock prone.
*/
- private static class Batched<K, V> implements IgniteDataLoader.Updater<K, V> {
+ private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;
@@ -160,7 +160,7 @@ public class GridDataLoadCacheUpdaters {
/**
* Batched updater. Updates cache using batch operations thus is dead lock prone.
*/
- private static class BatchedSorted<K, V> implements IgniteDataLoader.Updater<K, V> {
+ private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
index 9e2a483..8aa554a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
@@ -48,7 +48,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
private final boolean skipStore;
/** */
- private final IgniteDataLoader.Updater<K, V> updater;
+ private final IgniteDataStreamer.Updater<K, V> updater;
/**
* @param ctx Context.
@@ -65,7 +65,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
Collection<Map.Entry<K, V>> col,
boolean ignoreDepOwnership,
boolean skipStore,
- IgniteDataLoader.Updater<K, V> updater) {
+ IgniteDataStreamer.Updater<K, V> updater) {
this.ctx = ctx;
this.log = log;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
index 5efcfe9..dffa862 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
@@ -34,7 +34,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {
/** Data loader. */
@GridToStringExclude
- private IgniteDataLoaderImpl dataLdr;
+ private IgniteDataStreamerImpl dataLdr;
/**
* Default constructor for {@link Externalizable} support.
@@ -47,7 +47,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {
* @param ctx Context.
* @param dataLdr Data loader.
*/
- GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) {
+ GridDataLoaderFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
super(ctx);
assert dataLdr != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
index d470d02..b29c9ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
@@ -41,7 +41,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
*/
public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
/** Loaders map (access is not supposed to be highly concurrent). */
- private Collection<IgniteDataLoaderImpl> ldrs = new GridConcurrentHashSet<>();
+ private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -50,7 +50,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
private Thread flusher;
/** */
- private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ = new DelayQueue<>();
+ private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
/** Marshaller. */
private final Marshaller marsh;
@@ -80,7 +80,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
@Override protected void body() throws InterruptedException {
while (!isCancelled()) {
- IgniteDataLoaderImpl<K, V> ldr = flushQ.take();
+ IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
if (!busyLock.enterBusy())
return;
@@ -118,7 +118,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
U.interrupt(flusher);
U.join(flusher, log);
- for (IgniteDataLoaderImpl<?, ?> ldr : ldrs) {
+ for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
if (log.isDebugEnabled())
log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
@@ -142,12 +142,12 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
* @param compact {@code true} if data loader should transfer data in compact format.
* @return Data loader.
*/
- public IgniteDataLoaderImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
+ public IgniteDataStreamerImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to create data loader (grid is stopping).");
try {
- final IgniteDataLoaderImpl<K, V> ldr = new IgniteDataLoaderImpl<>(ctx, cacheName, flushQ, compact);
+ final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
ldrs.add(ldr);
@@ -173,7 +173,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
* @param cacheName Cache name ({@code null} for default cache).
* @return Data loader.
*/
- public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+ public IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
return dataLoader(cacheName, true);
}
@@ -234,7 +234,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
}
Collection<Map.Entry<K, V>> col;
- IgniteDataLoader.Updater<K, V> updater;
+ IgniteDataStreamer.Updater<K, V> updater;
try {
col = marsh.unmarshal(req.collectionBytes(), clsLdr);