You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by arunmahadevan <gi...@git.apache.org> on 2017/08/01 06:34:36 UTC

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2218#discussion_r130528788
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java ---
    @@ -0,0 +1,596 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.topology;
    +
    +import java.util.AbstractCollection;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Deque;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Supplier;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.state.KeyValueState;
    +import org.apache.storm.state.State;
    +import org.apache.storm.state.StateFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.base.BaseWindowedBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.windowing.DefaultEvictionContext;
    +import org.apache.storm.windowing.Event;
    +import org.apache.storm.windowing.EventImpl;
    +import org.apache.storm.windowing.WindowLifecycleListener;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static java.util.Collections.emptyIterator;
    +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
    +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
    +import static org.apache.storm.topology.WindowPartitionCache.RemovalListener;
    +
    +/**
    + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying
    + * checkpointing mechanisms to save the tuples in window to state. The tuples are also kept in-memory
    + * by transparently caching the window partitions and checkpointing them as needed.
    + */
    +public class PersistentWindowedBoltExecutor<T extends State> extends WindowedBoltExecutor implements IStatefulBolt<T> {
    +    private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
    +    private final IStatefulWindowedBolt<T> statefulWindowedBolt;
    +    private transient TopologyContext topologyContext;
    +    private transient OutputCollector outputCollector;
    +    private transient WindowState<Tuple> state;
    +    private transient boolean stateInitialized;
    +    private transient boolean prePrepared;
    +
    +    public PersistentWindowedBoltExecutor(IStatefulWindowedBolt<T> bolt) {
    +        super(bolt);
    +        statefulWindowedBolt = bolt;
    +    }
    +
    +    @Override
    +    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
    +        List<String> registrations = (List<String>) topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
    +        registrations.add(ConcurrentLinkedQueue.class.getName());
    +        registrations.add(LinkedList.class.getName());
    +        registrations.add(AtomicInteger.class.getName());
    +        registrations.add(EventImpl.class.getName());
    +        registrations.add(WindowPartition.class.getName());
    +        registrations.add(DefaultEvictionContext.class.getName());
    +        topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
    +        prepare(topoConf, context, collector,
    +            getWindowState(topoConf, context),
    +            getPartitionState(topoConf, context),
    +            getWindowSystemState(topoConf, context));
    +    }
    +
    +    @Override
    +    protected void validate(Map<String, Object> topoConf,
    +                            BaseWindowedBolt.Count windowLengthCount,
    +                            BaseWindowedBolt.Duration windowLengthDuration,
    +                            BaseWindowedBolt.Count slidingIntervalCount,
    +                            BaseWindowedBolt.Duration slidingIntervalDuration) {
    +        if (windowLengthCount == null && windowLengthDuration == null) {
    +            throw new IllegalArgumentException("Window length is not specified");
    +        }
    +        int interval = getCheckpointIntervalMillis(topoConf);
    +        int timeout = getTopologyTimeoutMillis(topoConf);
    +        if (interval > timeout) {
    +            throw new IllegalArgumentException(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL + interval +
    +                " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                " value " + timeout);
    +        }
    +    }
    +
    +    private int getCheckpointIntervalMillis(Map<String, Object> topoConf) {
    +        int checkpointInterval = Integer.MAX_VALUE;
    +        if (topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL) != null) {
    +            checkpointInterval = ((Number) topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
    +        }
    +        return checkpointInterval;
    +    }
    +
    +    // package access for unit tests
    +    void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector,
    +                 KeyValueState<Long, WindowPartition<Tuple>> windowState,
    +                 KeyValueState<String, Deque<Long>> partitionState,
    +                 KeyValueState<String, Optional<?>> windowSystemState) {
    +        init(topoConf, context, collector, windowState, partitionState, windowSystemState);
    +        doPrepare(topoConf, context, new NoAckOutputCollector(collector), state, true);
    +        Map<String, Optional<?>> wstate = new HashMap<>();
    +        windowSystemState.forEach(s -> wstate.put(s.getKey(), s.getValue()));
    +        restoreState(wstate);
    +    }
    +
    +    @Override
    +    protected void start() {
    +        if (stateInitialized) {
    +            super.start();
    +        } else {
    +            LOG.debug("Will invoke start after state is initialized");
    +        }
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        if (!stateInitialized) {
    +            throw new IllegalStateException("execute invoked before initState with input tuple " + input);
    +        }
    +        super.execute(input);
    +        // StatefulBoltExecutor does the actual ack when the state is saved.
    +        outputCollector.ack(input);
    +    }
    +
    +    @Override
    +    public void initState(T state) {
    +        if (stateInitialized) {
    +            String msg = "initState invoked when the state is already initialized";
    +            LOG.warn(msg);
    +            throw new IllegalStateException(msg);
    +        } else {
    +            statefulWindowedBolt.initState(state);
    +            stateInitialized = true;
    +            start();
    +        }
    +    }
    +
    +    @Override
    +    public void prePrepare(long txid) {
    +        if (stateInitialized) {
    +            LOG.debug("Prepare streamState, txid {}", txid);
    +            state.prepareCommit(txid);
    +            prePrepared = true;
    +        } else {
    +            String msg = "Cannot prepare before initState";
    +            LOG.warn(msg);
    +            throw new IllegalStateException(msg);
    +        }
    +    }
    +
    +    @Override
    +    public void preCommit(long txid) {
    +        // preCommit can be invoked during recovery before the state is initialized
    +        if (prePrepared || !stateInitialized) {
    +            LOG.debug("Commit streamState, txid {}", txid);
    +            state.commit(txid);
    +        } else {
    +            String msg = "preCommit before prePrepare in initialized state";
    +            LOG.warn(msg);
    +            throw new IllegalStateException(msg);
    +        }
    +    }
    +
    +    @Override
    +    public void preRollback() {
    +        LOG.debug("Rollback streamState, stateInitialized {}", stateInitialized);
    +        state.rollback();
    +    }
    +
    +    @Override
    +    protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
    +        return new WindowLifecycleListener<Tuple>() {
    +            @Override
    +            public void onExpiry(List<Tuple> events) {
    +                /*
    +                 * NO-OP: the events are ack-ed in execute
    +                 */
    +            }
    +
    +            @Override
    +            public void onActivation(Supplier<Iterator<Tuple>> eventsIt,
    +                                     Supplier<Iterator<Tuple>> newEventsIt,
    +                                     Supplier<Iterator<Tuple>> expiredIt,
    +                                     Long timestamp) {
    +                /*
    +                 * Here we don't set the tuples in windowedOutputCollector's context and emit un-anchored.
    +                 * The checkpoint tuple will trigger a checkpoint in the receiver with the emitted tuples.
    +                 */
    +                boltExecute(eventsIt, newEventsIt, expiredIt, timestamp);
    +                state.clearIteratorPins();
    +            }
    +        };
    +    }
    +
    +    private void init(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector,
    +                      KeyValueState<Long, WindowPartition<Tuple>> windowState,
    +                      KeyValueState<String, Deque<Long>> partitionState,
    +                      KeyValueState<String, Optional<?>> windowSystemState) {
    +        topologyContext = context;
    +        outputCollector = collector;
    +        state = new WindowState<>(windowState, partitionState, windowSystemState, this::getState,
    +            statefulWindowedBolt.maxEventsInMemory());
    +    }
    +
    +    private KeyValueState<Long, WindowPartition<Tuple>> getWindowState(Map<String, Object> topoConf, TopologyContext context) {
    +        String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window";
    +        return (KeyValueState<Long, WindowPartition<Tuple>>) StateFactory.getState(namespace, topoConf, context);
    +    }
    +
    +    private KeyValueState<String, Deque<Long>> getPartitionState(Map<String, Object> topoConf, TopologyContext context) {
    +        String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window-partitions";
    +        return (KeyValueState<String, Deque<Long>>) StateFactory.getState(namespace, topoConf, context);
    +    }
    +
    +    private KeyValueState<String, Optional<?>> getWindowSystemState(Map<String, Object> topoConf, TopologyContext context) {
    +        String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window-systemstate";
    +        return (KeyValueState<String, Optional<?>>) StateFactory.getState(namespace, topoConf, context);
    +    }
    +
    +    // a wrapper around the window related states that are checkpointed
    +    private static class WindowState<T> extends AbstractCollection<Event<T>> {
    +        // number of events per window-partition
    +        private static final int MAX_PARTITION_EVENTS = 1000;
    +        private static final int MIN_PARTITIONS = 10;
    +        private static final String PARTITION_IDS_KEY = "pk";
    +        private final KeyValueState<String, Deque<Long>> partitionIdsState;
    +        private final KeyValueState<Long, WindowPartition<T>> windowPartitionsState;
    +        private final KeyValueState<String, Optional<?>> windowSystemState;
    +        // ordered partition keys
    +        private Deque<Long> partitionIds;
    +        private volatile long latestPartitionId;
    +        private WindowPartitionCache<Long, WindowPartition<T>> cache;
    +        private Supplier<Map<String, Optional<?>>> windowSystemStateSupplier;
    +        private final ReentrantLock partitionIdsLock = new ReentrantLock(true);
    +        private final ReentrantLock windowPartitionsLock = new ReentrantLock(true);
    +        private final long maxEventsInMemory;
    +        private WindowPartition<T> latestPartition;
    +        private Set<Long> iteratorPins = new HashSet<>();
    +
    +        WindowState(KeyValueState<Long, WindowPartition<T>> windowPartitionsState,
    +                    KeyValueState<String, Deque<Long>> partitionIdsState,
    +                    KeyValueState<String, Optional<?>> windowSystemState,
    +                    Supplier<Map<String, Optional<?>>> windowSystemStateSupplier,
    +                    long maxEventsInMemory) {
    +            this.windowPartitionsState = windowPartitionsState;
    +            this.partitionIdsState = partitionIdsState;
    +            this.windowSystemState = windowSystemState;
    +            this.windowSystemStateSupplier = windowSystemStateSupplier;
    +            this.maxEventsInMemory = Math.max(MAX_PARTITION_EVENTS * MIN_PARTITIONS, maxEventsInMemory);
    +            initCache();
    +            initPartitions();
    +        }
    +
    +        @Override
    +        public boolean add(Event<T> event) {
    +            if (latestPartition.size() >= MAX_PARTITION_EVENTS) {
    +                cache.unpin(latestPartition.getId());
    +                latestPartition = getPinnedPartition(getNextPartitionId());
    +            }
    +            latestPartition.add(event);
    +            return true;
    +        }
    +
    +        @Override
    +        public Iterator<Event<T>> iterator() {
    +
    +            return new Iterator<Event<T>>() {
    +                private Iterator<Long> ids = getIds();
    +                private Iterator<Event<T>> current = emptyIterator();
    +                private Iterator<Event<T>> removeFrom;
    +                private WindowPartition<T> curPartition;
    +
    +                private Iterator<Long> getIds() {
    +                    try {
    +                        partitionIdsLock.lock();
    +                        LOG.debug("Iterator partitionIds: {}", partitionIds);
    +                        return new ArrayList<>(partitionIds).iterator();
    +                    } finally {
    +                        partitionIdsLock.unlock();
    +                    }
    +                }
    +
    +                @Override
    +                public void remove() {
    +                    if (removeFrom == null) {
    +                        throw new IllegalStateException("No calls to next() since last call to remove()");
    +                    }
    +                    removeFrom.remove();
    +                    removeFrom = null;
    +                }
    +
    +                @Override
    +                public boolean hasNext() {
    +                    boolean curHasNext = current.hasNext();
    +                    while (!curHasNext && ids.hasNext()) {
    +                        if (curPartition != null) {
    +                            unpin(curPartition.getId());
    +                        }
    +                        curPartition = getPinnedPartition(ids.next());
    +                        if (curPartition != null) {
    +                            iteratorPins.add(curPartition.getId());
    +                            current = curPartition.iterator();
    +                            curHasNext = current.hasNext();
    +                        }
    +                    }
    +                    // un-pin the last partition
    +                    if (!curHasNext && curPartition != null) {
    +                        unpin(curPartition.getId());
    +                        curPartition = null;
    +                    }
    +                    return curHasNext;
    +                }
    +
    +                @Override
    +                public Event<T> next() {
    +                    if (!hasNext()) {
    +                        throw new NoSuchElementException();
    +                    }
    +                    removeFrom = current;
    +                    return current.next();
    +                }
    +
    +                private void unpin(long id) {
    +                    cache.unpin(id);
    +                    iteratorPins.remove(id);
    +                }
    +            };
    +        }
    +
    +        void clearIteratorPins() {
    +            LOG.debug("clearIteratorPins '{}'", iteratorPins);
    +            Iterator<Long> it = iteratorPins.iterator();
    +            while (it.hasNext()) {
    +                cache.unpin(it.next());
    +                it.remove();
    +            }
    +        }
    +
    +        @Override
    +        public int size() {
    +            throw new UnsupportedOperationException();
    +        }
    +
    +        void prepareCommit(long txid) {
    +            flush();
    +            partitionIdsState.prepareCommit(txid);
    +            windowPartitionsState.prepareCommit(txid);
    +            windowSystemState.prepareCommit(txid);
    +        }
    +
    +        void commit(long txid) {
    +            partitionIdsState.commit(txid);
    +            windowPartitionsState.commit(txid);
    +            windowSystemState.commit(txid);
    +        }
    +
    +        void rollback() {
    +            partitionIdsState.rollback();
    +            windowPartitionsState.rollback();
    +            windowSystemState.rollback();
    +        }
    +
    +        private void initPartitions() {
    +            partitionIds = partitionIdsState.get(PARTITION_IDS_KEY, new LinkedList<>());
    +            if (partitionIds.isEmpty()) {
    +                partitionIds.add(0L);
    +                partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
    +            }
    +            latestPartitionId = partitionIds.peekLast();
    +            latestPartition = cache.getPinned(latestPartitionId);
    +        }
    +
    +        private void initCache() {
    +            long size = maxEventsInMemory / MAX_PARTITION_EVENTS;
    +            LOG.info("maxEventsInMemory: {}, partition size: {}, number of partitions: {}",
    +                maxEventsInMemory, MAX_PARTITION_EVENTS, size);
    +            cache = SimpleWindowPartitionCache.<Long, WindowPartition<T>>newBuilder()
    +                .maximumSize(size)
    +                .removalListener(new RemovalListener<Long, WindowPartition<T>>() {
    +                    @Override
    +                    public void onRemoval(Long pid, WindowPartition<T> p, RemovalCause removalCause) {
    +                        Objects.requireNonNull(pid, "Null partition id");
    +                        Objects.requireNonNull(p, "Null window partition");
    +                        LOG.debug("onRemoval for id '{}', WindowPartition '{}'", pid, p);
    +                        try {
    +                            windowPartitionsLock.lock();
    +                            if (p.isEmpty() && pid != latestPartitionId) {
    +                                // if the empty partition was not invalidated by flush, but evicted from cache
    +                                if (removalCause != RemovalCause.EXPLICIT) {
    +                                    deletePartition(pid);
    +                                    windowPartitionsState.delete(pid);
    +                                }
    +                            } else if (p.isModified()) {
    +                                windowPartitionsState.put(pid, p);
    +                            } else {
    +                                LOG.debug("WindowPartition '{}' is not modified", pid);
    +                            }
    +                        } finally {
    +                            windowPartitionsLock.unlock();
    +                        }
    +                    }
    +                }).build(new CacheLoader<Long, WindowPartition<T>>() {
    +                    @Override
    +                   public WindowPartition<T> load(Long id) {
    +                        LOG.debug("Load partition: {}", id);
    +                        // load from state
    +                        try {
    +                            windowPartitionsLock.lock();
    +                            return windowPartitionsState.get(id, new WindowPartition<>(id));
    +                        } finally {
    +                            windowPartitionsLock.unlock();
    +                        }
    +                    }
    +                    }
    +                );
    +        }
    +
    +        private void deletePartition(long pid) {
    +            LOG.debug("Delete partition: {}", pid);
    +            try {
    +                partitionIdsLock.lock();
    +                partitionIds.remove(pid);
    +                partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
    --- End diff --
    
    Ok I wrongly assumed the comment was for `initPartitions`, so discard my previous comment. 
    Here we first update the `partitionIds` member variable. The put is to save the value into the kv-state. The partitionIdsState state has a single entry something like `"key" -> List of ids`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---