You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2017/09/26 19:27:32 UTC

[GitHub] storm pull request #2345: STORM-2438: added in rebalance changes to support ...

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/2345

     STORM-2438: added in rebalance changes to support RAS

    This is based off #2339 (which is in the first commit).  If people want to review it all together I am fine with closing the other one.
    
    This does a few things
    
    1) It updates the AsyncLocalizer and Slot so that when the jar, conf, or serialized topology changes the new code is downloaded to a temp location, the workers are shot, the changes are made, and the workers are brought back up.
    2) It adds in changes to the rebalance command so you can adjust config options and resources in addition to what was there before.
    
    I plan on adding support for generic blobs to optionally cause the workers to be restarted on update. This would be for things like jars on the classpath.  I plan to do this work in a separate pull request because this is already very large. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-2438

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2345.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2345
    
----
commit 78cb243c4bc9aaeaffd6f1c76915ac20016b32e7
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-09-15T18:29:40Z

    STORM-2084: Refactor localization to combine files together

commit d9deab6dbac9fccccb961c437ac0713988993002
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-09-21T15:55:24Z

    STORM-2438: added in rebalance changes to support RAS

----


---

[GitHub] storm issue #2345: STORM-2438: added in rebalance changes to support RAS

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2345
  
    For anyone interested I just rebased to fix a very minor merge conflict.


---

[GitHub] storm pull request #2345: STORM-2438: added in rebalance changes to support ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2345#discussion_r143576368
  
    --- Diff: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
    @@ -156,17 +146,141 @@ public AsyncLocalizer(Map<String, Object> conf, AtomicReference<Map<Long, LocalA
                 DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
     
             execService = Executors.newScheduledThreadPool(threadPoolSize,
    -            new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor").build());
    +            new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build());
             reconstructLocalizedResources();
     
             symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
    -        basicPending = new HashMap<>();
             blobPending = new HashMap<>();
             this.currAssignment = currAssignment;
     
             recoverBlobReferences(portToAssignments);
         }
     
    +    public AsyncLocalizer(Map<String, Object> conf, AtomicReference<Map<Long, LocalAssignment>> currAssignment,
    +                          Map<Integer, LocalAssignment> portToAssignments) throws IOException {
    +        this(conf, AdvancedFSOps.make(conf), ConfigUtils.supervisorLocalDir(conf), currAssignment, portToAssignments);
    +    }
    +
    +    @VisibleForTesting
    +    LocallyCachedBlob getTopoJar(final String topologyId) throws IOException {
    +        String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
    +        LocallyCachedBlob topoJar = topologyBlobs.get(topoJarKey);
    +        if (topoJar == null) {
    +            topoJar = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
    +                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR);
    +            topologyBlobs.put(topoJarKey, topoJar);
    +        }
    +        return topoJar;
    +    }
    +
    +    @VisibleForTesting
    +    LocallyCachedBlob getTopoCode(final String topologyId) throws IOException {
    +        String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
    +        LocallyCachedBlob topoCode = topologyBlobs.get(topoCodeKey);
    +        if (topoCode == null) {
    +            topoCode = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
    +                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE);
    +            topologyBlobs.put(topoCodeKey, topoCode);
    +        }
    +        return topoCode;
    +    }
    +
    +    @VisibleForTesting
    +    LocallyCachedBlob getTopoConf(final String topologyId) throws IOException {
    +        String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
    +        LocallyCachedBlob topoConf = topologyBlobs.get(topoConfKey);
    +        if (topoConf == null) {
    +            topoConf = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
    +                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF);
    +            topologyBlobs.put(topoConfKey, topoConf);
    +        }
    +        return topoConf;
    +    }
    +
    +    public synchronized CompletableFuture<Void> requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port,
    +                                                                             final BlobChangingCallback cb) throws IOException {
    +        final String topologyId = assignment.get_topology_id();
    +
    +        CompletableFuture<Void> baseBlobs = requestDownloadBaseTopologyBlobs(assignment, port, cb);
    +        return baseBlobs.thenComposeAsync((v) -> {
    +            LocalDownloadedResource localResource = blobPending.get(topologyId);
    +            if (localResource == null) {
    +                Supplier<Void> supplier = new DownloadBlobs(topologyId, assignment.get_owner());
    +                localResource = new LocalDownloadedResource(CompletableFuture.supplyAsync(supplier, execService));
    +                blobPending.put(topologyId, localResource);
    +            }
    +            CompletableFuture<Void> r = localResource.reserve(port, assignment);
    +            LOG.debug("Reserved blobs {} {}", topologyId, localResource);
    +            return r;
    +        });
    +    }
    +
    +    @VisibleForTesting
    +    synchronized CompletableFuture<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port,
    +                                                                          BlobChangingCallback cb) throws IOException {
    +        PortAndAssignment pna = new PortAndAssignment(port, assignment);
    +        final String topologyId = assignment.get_topology_id();
    +
    +        LocallyCachedBlob topoJar = getTopoJar(topologyId);
    +        topoJar.addReference(pna, cb);
    +
    +        LocallyCachedBlob topoCode = getTopoCode(topologyId);
    +        topoCode.addReference(pna, cb);
    +
    +        LocallyCachedBlob topoConf = getTopoConf(topologyId);
    +        topoConf.addReference(pna, cb);
    +
    +        CompletableFuture<Void> ret = topologyBasicDownloaded.get(topologyId);
    +        if (ret == null) {
    +            ret = downloadOrUpdate(topoJar, topoCode, topoConf);
    +        }
    +        return ret;
    +    }
    +
    +    private static final int ATTEMPTS_INTERVAL_TIME = 100;
    +
    +    private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob ... blobs) {
    +        CompletableFuture<Void> [] all = new CompletableFuture[blobs.length];
    +        for (int i = 0; i < blobs.length; i++) {
    +            final LocallyCachedBlob blob = blobs[i];
    +            all[i] = CompletableFuture.runAsync(() -> {
    +                LOG.debug("STARTING download of {}", blob);
    +                try (ClientBlobStore blobStore = ServerUtils.getClientBlobStoreForSupervisor(conf)) {
    +                    boolean done = false;
    +                    long failures = 0;
    +                    while (!done) {
    +                        try {
    +                            synchronized (blob) {
    +                                long localVersion = blob.getLocalVersion();
    +                                long remoteVersion = blob.getRemoteVersion(blobStore);
    +                                if (localVersion != remoteVersion) {
    +                                    try {
    +                                        long newVersion = blob.downloadToTempLocation(blobStore);
    +                                        blob.informAllOfChangeAndWaitForConsensus();
    +                                        blob.commitNewVersion(newVersion);
    +                                        blob.informAllChangeComplete();
    +                                    } finally {
    +                                        blob.cleanupOrphanedData();
    +                                    }
    +                                }
    +                            }
    +                            done = true;
    +                        } catch (Exception e) {
    +                            failures++;
    +                            if (failures > blobDownloadRetries) {
    +                                throw new RuntimeException("Could not download...", e);
    +                            }
    +                            LOG.warn("Failed to download blob {} will try again in {} ms", blob, ATTEMPTS_INTERVAL_TIME, e);
    +                            Utils.sleep(ATTEMPTS_INTERVAL_TIME);
    +                        }
    +                    }
    +                }
    +                LOG.debug("FINISHED download of {}", blob);
    --- End diff --
    
    I used it for debugging.  If you see a good reason for it to be info I can make it info, but I didn't see a lot of value in it.


---

[GitHub] storm issue #2345: STORM-2438: added in rebalance changes to support RAS

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2345
  
    I have done a lot of manual testing with this patch, but if others want to try it out feel free to kick the tires.  Any feedback is appreciated.


---

[GitHub] storm pull request #2345: STORM-2438: added in rebalance changes to support ...

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2345#discussion_r143501314
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
    @@ -263,7 +314,41 @@ public String toString() {
                 return "{ " + topoId + ": " + request + " }";
             }
         }
    -    
    +
    +    /**
    +     * Holds the information about a blob that is changing.
    +     */
    +    static class BlobChangeing {
    --- End diff --
    
    Does this need to be `BlobChanging`?


---

[GitHub] storm issue #2345: STORM-2438: added in rebalance changes to support RAS

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on the issue:

    https://github.com/apache/storm/pull/2345
  
    @revans2 LGTM.
    



---

[GitHub] storm pull request #2345: STORM-2438: added in rebalance changes to support ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2345#discussion_r143575205
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
    @@ -104,9 +111,12 @@
                 this.port = port;
                 this.iSupervisor = iSupervisor;
                 this.localState = localState;
    +            this.changingCallback = changingCallback;
             }
         }
    -    
    +
    +    //TODO go through all of the state transitions and make sure we handle changingBlobs
    +    //TODO make sure to add in transition helpers that clean changingBlobs && pendingChangeingBlobs for not the current topology
    --- End diff --
    
    Yes I left in a lot of TODOs #2363 cleans them up, but I will try to pull back what I can here.
    
    In this particular case I did go through the transitions and I think I have it all covered.


---

[GitHub] storm issue #2345: STORM-2438: added in rebalance changes to support RAS

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2345
  
    @kishorvpatil I think I addressed your review comments.  If you really want the log message to be info I can do it.


---

[GitHub] storm pull request #2345: STORM-2438: added in rebalance changes to support ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2345


---

[GitHub] storm issue #2345: STORM-2438: added in rebalance changes to support RAS

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2345
  
    For anyone interested I just rebased as #2339 was merged into master.


---

[GitHub] storm pull request #2345: STORM-2438: added in rebalance changes to support ...

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2345#discussion_r143559023
  
    --- Diff: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java ---
    @@ -0,0 +1,220 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.localizer;
    +
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.LinkOption;
    +import java.nio.file.Path;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.generated.AuthorizationException;
    +import org.apache.storm.generated.KeyNotFoundException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Represents a blob that is cached locally on disk by the supervisor.
    + */
    +public abstract class LocallyCachedBlob {
    +    private static final Logger LOG = LoggerFactory.getLogger(LocallyCachedBlob.class);
    +    public static final long NOT_DOWNLOADED_VERSION = -1;
    +    // A callback that does nothing.
    +    private static final BlobChangingCallback NOOP_CB = (assignment, port, resource, go) -> {};
    +
    +    private long lastUsed = System.currentTimeMillis();
    +    private final Map<PortAndAssignment, BlobChangingCallback> references = new HashMap<>();
    +    private final String blobDescription;
    +    private final String blobKey;
    +    private CompletableFuture<Void> doneUpdating = null;
    +
    +    /**
    +     * Create a new LocallyCachedBlob.
    +     * @param blobDescription a description of the blob this represents.  Typically it should at least be the blob key, but ideally also
    +     * include if it is an archive or not, what user or topology it is for, or if it is a storm.jar etc.
    +     */
    +    protected LocallyCachedBlob(String blobDescription, String blobKey) {
    +        this.blobDescription = blobDescription;
    +        this.blobKey = blobKey;
    +    }
    +
    +    /**
    +     * Get the version of the blob cached locally.  If the version is unknown or it has not been downloaded NOT_DOWNLOADED_VERSION
    +     * should be returned.
    +     * PRECONDITION: this can only be called with a lock on this instance held.
    +     */
    +    public abstract long getLocalVersion();
    +
    +    /**
    +     * Get the version of the blob in the blob store.
    +     * PRECONDITION: this can only be called with a lock on this instance held.
    +     */
    +    public abstract long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException;
    +
    +    /**
    +     * Download the latest version to a temp location. This may also include unzipping some or all of the data to a temp location.
    +     * PRECONDITION: this can only be called with a lock on this instance held.
    +     * @param store the store to us to download the data.
    +     * @return the version that was downloaded.
    +     */
    +    public abstract long downloadToTempLocation(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException;
    +
    +    /**
    +     * Commit the new version and make it available for the end user.
    +     * PRECONDITION: uncompressToTempLocationIfNeeded will have been called.
    +     * PRECONDITION: this can only be called with a lock on this instance held.
    +     * @param version the version of the blob to commit.
    +     */
    +    public abstract void commitNewVersion(long version) throws IOException;
    +
    +    /**
    +     * Clean up any temporary files.  This will be called after updating a blob, either successfully or if an error has occured.
    +     * The goal is to find any files that may be left over and remove them so space is not leaked.
    +     * PRECONDITION: this can only be called with a lock on this instance held.
    +     */
    +    public abstract void cleanupOrphanedData() throws IOException;
    +
    +    /**
    +     * Completely remove anything that is cached locally for this blob and all tracking files also stored for it.
    +     * This will be called after the blob was determined to no longer be needed in the cache.
    +     * PRECONDITION: this can only be called with a lock on this instance held.
    +     */
    +    public abstract void completelyRemove() throws IOException;
    +
    +    /**
    +     * Get the amount of disk space that is used by this blob.  If the blob is uncompressed it should be the sum of the space used by all
    +     * of the uncompressed files.  In general this will not be called with any locks held so it is a good idea to cache it and updated it
    +     * when committing a new version.
    +     */
    +    public abstract long getSizeOnDisk();
    +
    +    /**
    +     * Updates the last updated time.  This should be called when references are added or removed.
    +     */
    +    private synchronized void touch() {
    +        lastUsed = System.currentTimeMillis();
    +    }
    +
    +    /**
    +     * Get the last time that this used for LRU calculations.
    +     */
    +    public synchronized long getLastUsed() {
    +        return lastUsed;
    +    }
    +
    +    /**
    +     * Return true if this blob is actively being used, else false (meaning it can be deleted, but might not be).
    +     */
    +    public synchronized boolean isUsed() {
    +        return !references.isEmpty();
    +    }
    +
    +    /**
    +     * Get the size of p in bytes.
    +     * @param p the path to read.
    +     * @return the size of p in bytes.
    +     */
    +    protected long getSizeOnDisk(Path p) throws IOException {
    +        if (!Files.exists(p)) {
    +            return 0;
    +        } else if (Files.isRegularFile(p)) {
    +            return Files.size(p);
    +        } else {
    +            //We will not follow sym links
    +            return Files.walk(p)
    +                .filter((subp) -> Files.isRegularFile(subp, LinkOption.NOFOLLOW_LINKS))
    +                .mapToLong((subp) -> {
    +                    try {
    +                        return Files.size(subp);
    +                    } catch (IOException e) {
    +                        LOG.warn("Could not get the size of ");
    +                    }
    +                    return 0;
    +                }).sum();
    +        }
    +    }
    +
    +    /**
    +     * Mark that a given port and assignemnt are using this.
    +     * @param pna the slot and assignment that are using this blob.
    +     * @param cb an optional callback indicating that they want to know/synchronize when a blob is updated.
    +     */
    +    public void addReference(final PortAndAssignment pna, BlobChangingCallback cb) {
    +        if (cb == null) {
    +            cb = NOOP_CB;
    +        }
    +        if (references.put(pna, cb) != null) {
    +            LOG.warn("{} already has a reservation for {}", pna, blobDescription);
    +        }
    +    }
    +
    +    /**
    +     * Removes a reservation for this blob from a given slot and assignemnt.
    +     * @param pna the slot + assignment that no longer needs this blob.
    +     */
    +    public void removeReference(final PortAndAssignment pna) {
    +        if (references.remove(pna) == null) {
    +            LOG.warn("{} had no reservation for {}", pna, blobDescription);
    +        }
    +    }
    +
    +    /**
    +     * Inform all of the callbacks that a change is going to happen and then wait for
    +     * them to all get back that it is OK to make that change.
    +     */
    +    public synchronized void informAllOfChangeAndWaitForConsensus() {
    +        CountDownLatch cdl = new CountDownLatch(references.size());
    +        doneUpdating = new CompletableFuture<>();
    +        for (Map.Entry<PortAndAssignment, BlobChangingCallback> entry : references.entrySet()) {
    +            GoodToGo gtg = new GoodToGo(cdl, doneUpdating);
    +            try {
    +                PortAndAssignment pna = entry.getKey();
    +                BlobChangingCallback cb = entry.getValue();
    +                //TODO we probably want to not use this, or make it just return something that has less power to modify things
    +                cb.blobChanging(pna.getAssignment(), pna.getPort(), this, gtg);
    +            } finally {
    +                gtg.countDownIfLatchWasNotGotten();
    +            }
    +        }
    +        try {
    +            cdl.await(3, TimeUnit.MINUTES);
    +        } catch (InterruptedException e) {
    +            //TODO need to think about error handling here in general.
    --- End diff --
    
    Should we at least log the error being ignored?


---

[GitHub] storm pull request #2345: STORM-2438: added in rebalance changes to support ...

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2345#discussion_r143510721
  
    --- Diff: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
    @@ -156,17 +146,141 @@ public AsyncLocalizer(Map<String, Object> conf, AtomicReference<Map<Long, LocalA
                 DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
     
             execService = Executors.newScheduledThreadPool(threadPoolSize,
    -            new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor").build());
    +            new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build());
             reconstructLocalizedResources();
     
             symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
    -        basicPending = new HashMap<>();
             blobPending = new HashMap<>();
             this.currAssignment = currAssignment;
     
             recoverBlobReferences(portToAssignments);
         }
     
    +    public AsyncLocalizer(Map<String, Object> conf, AtomicReference<Map<Long, LocalAssignment>> currAssignment,
    +                          Map<Integer, LocalAssignment> portToAssignments) throws IOException {
    +        this(conf, AdvancedFSOps.make(conf), ConfigUtils.supervisorLocalDir(conf), currAssignment, portToAssignments);
    +    }
    +
    +    @VisibleForTesting
    +    LocallyCachedBlob getTopoJar(final String topologyId) throws IOException {
    +        String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
    +        LocallyCachedBlob topoJar = topologyBlobs.get(topoJarKey);
    +        if (topoJar == null) {
    +            topoJar = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
    +                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR);
    +            topologyBlobs.put(topoJarKey, topoJar);
    +        }
    +        return topoJar;
    +    }
    +
    +    @VisibleForTesting
    +    LocallyCachedBlob getTopoCode(final String topologyId) throws IOException {
    +        String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
    +        LocallyCachedBlob topoCode = topologyBlobs.get(topoCodeKey);
    +        if (topoCode == null) {
    +            topoCode = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
    +                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE);
    +            topologyBlobs.put(topoCodeKey, topoCode);
    +        }
    +        return topoCode;
    +    }
    +
    +    @VisibleForTesting
    +    LocallyCachedBlob getTopoConf(final String topologyId) throws IOException {
    +        String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
    +        LocallyCachedBlob topoConf = topologyBlobs.get(topoConfKey);
    +        if (topoConf == null) {
    +            topoConf = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
    +                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF);
    +            topologyBlobs.put(topoConfKey, topoConf);
    +        }
    +        return topoConf;
    +    }
    +
    +    public synchronized CompletableFuture<Void> requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port,
    +                                                                             final BlobChangingCallback cb) throws IOException {
    +        final String topologyId = assignment.get_topology_id();
    +
    +        CompletableFuture<Void> baseBlobs = requestDownloadBaseTopologyBlobs(assignment, port, cb);
    +        return baseBlobs.thenComposeAsync((v) -> {
    +            LocalDownloadedResource localResource = blobPending.get(topologyId);
    +            if (localResource == null) {
    +                Supplier<Void> supplier = new DownloadBlobs(topologyId, assignment.get_owner());
    +                localResource = new LocalDownloadedResource(CompletableFuture.supplyAsync(supplier, execService));
    +                blobPending.put(topologyId, localResource);
    +            }
    +            CompletableFuture<Void> r = localResource.reserve(port, assignment);
    +            LOG.debug("Reserved blobs {} {}", topologyId, localResource);
    +            return r;
    +        });
    +    }
    +
    +    @VisibleForTesting
    +    synchronized CompletableFuture<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port,
    +                                                                          BlobChangingCallback cb) throws IOException {
    +        PortAndAssignment pna = new PortAndAssignment(port, assignment);
    +        final String topologyId = assignment.get_topology_id();
    +
    +        LocallyCachedBlob topoJar = getTopoJar(topologyId);
    +        topoJar.addReference(pna, cb);
    +
    +        LocallyCachedBlob topoCode = getTopoCode(topologyId);
    +        topoCode.addReference(pna, cb);
    +
    +        LocallyCachedBlob topoConf = getTopoConf(topologyId);
    +        topoConf.addReference(pna, cb);
    +
    +        CompletableFuture<Void> ret = topologyBasicDownloaded.get(topologyId);
    +        if (ret == null) {
    +            ret = downloadOrUpdate(topoJar, topoCode, topoConf);
    +        }
    +        return ret;
    +    }
    +
    +    private static final int ATTEMPTS_INTERVAL_TIME = 100;
    +
    +    private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob ... blobs) {
    +        CompletableFuture<Void> [] all = new CompletableFuture[blobs.length];
    +        for (int i = 0; i < blobs.length; i++) {
    +            final LocallyCachedBlob blob = blobs[i];
    +            all[i] = CompletableFuture.runAsync(() -> {
    +                LOG.debug("STARTING download of {}", blob);
    +                try (ClientBlobStore blobStore = ServerUtils.getClientBlobStoreForSupervisor(conf)) {
    +                    boolean done = false;
    +                    long failures = 0;
    +                    while (!done) {
    +                        try {
    +                            synchronized (blob) {
    +                                long localVersion = blob.getLocalVersion();
    +                                long remoteVersion = blob.getRemoteVersion(blobStore);
    +                                if (localVersion != remoteVersion) {
    +                                    try {
    +                                        long newVersion = blob.downloadToTempLocation(blobStore);
    +                                        blob.informAllOfChangeAndWaitForConsensus();
    +                                        blob.commitNewVersion(newVersion);
    +                                        blob.informAllChangeComplete();
    +                                    } finally {
    +                                        blob.cleanupOrphanedData();
    +                                    }
    +                                }
    +                            }
    +                            done = true;
    +                        } catch (Exception e) {
    +                            failures++;
    +                            if (failures > blobDownloadRetries) {
    +                                throw new RuntimeException("Could not download...", e);
    +                            }
    +                            LOG.warn("Failed to download blob {} will try again in {} ms", blob, ATTEMPTS_INTERVAL_TIME, e);
    +                            Utils.sleep(ATTEMPTS_INTERVAL_TIME);
    +                        }
    +                    }
    +                }
    +                LOG.debug("FINISHED download of {}", blob);
    --- End diff --
    
    Should this be info?


---

[GitHub] storm pull request #2345: STORM-2438: added in rebalance changes to support ...

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2345#discussion_r143499778
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
    @@ -104,9 +111,12 @@
                 this.port = port;
                 this.iSupervisor = iSupervisor;
                 this.localState = localState;
    +            this.changingCallback = changingCallback;
             }
         }
    -    
    +
    +    //TODO go through all of the state transitions and make sure we handle changingBlobs
    +    //TODO make sure to add in transition helpers that clean changingBlobs && pendingChangeingBlobs for not the current topology
    --- End diff --
    
    ??


---