You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/05 15:45:09 UTC

ignite git commit: Added "affinity".

Repository: ignite
Updated Branches:
  refs/heads/ignite-3247 414aabf1f -> f029f51d9


Added "affinity".


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f029f51d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f029f51d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f029f51d

Branch: refs/heads/ignite-3247
Commit: f029f51d9ae67ff970818d665803fa3c3aebf200
Parents: 414aabf
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Sun Jun 5 18:45:02 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Sun Jun 5 18:45:02 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsBlockLocationImpl.java  | 87 ++++++++++++++----
 .../internal/processors/igfs/IgfsImpl.java      |  4 +
 .../igfs/client/IgfsClientAffinityCallable.java | 95 ++++++++++++++++++++
 .../ignite/internal/util/IgniteUtils.java       | 40 +++++++++
 4 files changed, 211 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f029f51d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
index 0ec31ba..2d4a0af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
@@ -27,6 +27,12 @@ import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -37,7 +43,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 /**
  * File block location in the grid.
  */
-public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable {
+public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable, Binarylizable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -155,13 +161,7 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable
         return S.toString(IgfsBlockLocationImpl.class, this);
     }
 
-    /**
-     * Writes this object to data output. Note that this is not externalizable
-     * interface because we want to eliminate any marshaller.
-     *
-     * @param out Data output to write.
-     * @throws IOException If write failed.
-     */
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         assert names != null;
         assert hosts != null;
@@ -189,13 +189,7 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable
             out.writeUTF(host);
     }
 
-    /**
-     * Reads object from data input. Note we do not use externalizable interface
-     * to eliminate marshaller.
-     *
-     * @param in Data input.
-     * @throws IOException If read failed.
-     */
+    /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException {
         start = in.readLong();
         len = in.readLong();
@@ -226,6 +220,69 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable
             hosts.add(in.readUTF());
     }
 
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+        BinaryRawWriter rawWriter = writer.rawWriter();
+
+        assert names != null;
+        assert hosts != null;
+
+        rawWriter.writeLong(start);
+        rawWriter.writeLong(len);
+
+        rawWriter.writeBoolean(nodeIds != null);
+
+        if (nodeIds != null) {
+            rawWriter.writeInt(nodeIds.size());
+
+            for (UUID nodeId : nodeIds)
+                U.writeUuid(rawWriter, nodeId);
+        }
+
+        rawWriter.writeInt(names.size());
+
+        for (String name : names)
+            rawWriter.writeString(name);
+
+        rawWriter.writeInt(hosts.size());
+
+        for (String host : hosts)
+            rawWriter.writeString(host);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+        BinaryRawReader rawReader = reader.rawReader();
+
+        start = rawReader.readLong();
+        len = rawReader.readLong();
+
+        int size;
+
+        if (rawReader.readBoolean()) {
+            size = rawReader.readInt();
+
+            nodeIds = new ArrayList<>(size);
+
+            for (int i = 0; i < size; i++)
+                nodeIds.add(U.readUuid(rawReader));
+        }
+
+        size = rawReader.readInt();
+
+        names = new ArrayList<>(size);
+
+        for (int i = 0; i < size; i++)
+            names.add(rawReader.readString());
+
+        size = rawReader.readInt();
+
+        hosts = new ArrayList<>(size);
+
+        for (int i = 0; i < size; i++)
+            hosts.add(rawReader.readString());
+    }
+
     /**
      * Converts collection of rich nodes to block location data.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/f029f51d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 445d158..10a3f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientAffinityCallable;
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientDeleteCallable;
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientExistsCallable;
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientInfoCallable;
@@ -1242,6 +1243,9 @@ public final class IgfsImpl implements IgfsEx {
         A.ensure(start >= 0, "start >= 0");
         A.ensure(len >= 0, "len >= 0");
 
+        if (meta.isClient())
+            return meta.runClientTask(new IgfsClientAffinityCallable(cfg.getName(), path, start, len, maxLen));
+
         return safeOp(new Callable<Collection<IgfsBlockLocation>>() {
             @Override public Collection<IgfsBlockLocation> call() throws Exception {
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/f029f51d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAffinityCallable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAffinityCallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAffinityCallable.java
new file mode 100644
index 0000000..1668f36
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAffinityCallable.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.igfs.IgfsContext;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+
+/**
+ * IGFS client affinity callable.
+ */
+public class IgfsClientAffinityCallable extends IgfsClientAbstractCallable<Collection<IgfsBlockLocation>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Start. */
+    private long start;
+
+    /** Length. */
+    private long len;
+
+    /** Maximum length. */
+    private long maxLen;
+
+    /**
+     * Default constructor.
+     */
+    public IgfsClientAffinityCallable() {
+        // NO-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param igfsName IGFS name.
+     * @param path Path.
+     * @param start Start.
+     * @param len Length.
+     * @param maxLen Maximum length.
+     */
+    public IgfsClientAffinityCallable(@Nullable String igfsName, IgfsPath path, long start, long len, long maxLen) {
+        super(igfsName, path);
+
+        this.start = start;
+        this.len = len;
+        this.maxLen = maxLen;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<IgfsBlockLocation> call0(IgfsContext ctx) throws Exception {
+        return ctx.igfs().affinity(path, start, len, maxLen);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary0(BinaryRawWriter writer) throws BinaryObjectException {
+        writer.writeLong(start);
+        writer.writeLong(len);
+        writer.writeLong(maxLen);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary0(BinaryRawReader reader) throws BinaryObjectException {
+        start = reader.readLong();
+        len = reader.readLong();
+        maxLen = reader.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsClientAffinityCallable.class, this);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/f029f51d/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 3717d31..c898909 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -149,6 +149,8 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
 import org.apache.ignite.cluster.ClusterGroupEmptyException;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
@@ -4710,6 +4712,44 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Writes UUID to binary writer.
+     *
+     * @param out Output Binary writer.
+     * @param uid UUID to write.
+     * @throws IOException If write failed.
+     */
+    public static void writeUuid(BinaryRawWriter out, UUID uid) {
+        // Write null flag.
+        if (uid != null) {
+            out.writeBoolean(true);
+
+            out.writeLong(uid.getMostSignificantBits());
+            out.writeLong(uid.getLeastSignificantBits());
+        }
+        else
+            out.writeBoolean(false);
+    }
+
+    /**
+     * Reads UUID from binary reader.
+     *
+     * @param in Binary reader.
+     * @return Read UUID.
+     * @throws IOException If read failed.
+     */
+    @Nullable public static UUID readUuid(BinaryRawReader in) {
+        // If UUID is not null.
+        if (in.readBoolean()) {
+            long most = in.readLong();
+            long least = in.readLong();
+
+            return new UUID(most, least);
+        }
+        else
+            return null;
+    }
+
+    /**
      * Writes {@link org.apache.ignite.lang.IgniteUuid} to output stream. This method is meant to be used by
      * implementations of {@link Externalizable} interface.
      *