You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by shixiaogang <gi...@git.apache.org> on 2017/02/20 10:02:30 UTC

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

GitHub user shixiaogang opened a pull request:

    https://github.com/apache/flink/pull/3359

    [FLINK-5544][streaming] Add InternalTimerService implemented in RocksDB

    - Refactor the methods defined in `InternalTimerService`. Some common implementation in `HeapInternalTimerService` now is moved in `InternalTimerService`.
    - Implement `RocksDBInternalTimerService` which stores timers in RocksDB and sorts them with an in-momory heap.
    - Implement `InternalTimerServiceTestBase` to verify the implementation of `InternalTimerService`.
    - Update `AbstractStreamOperator` to allow the usage of customized `InternalTimerService`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink flink-5544

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3359.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3359
    
----
commit 341fd97c47336d4f87cea997e134af68f8ef5265
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Date:   2017-02-20T09:55:40Z

    Add InternalTimerService implemented in RocksDB

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by EXPjbucher <gi...@git.apache.org>.
Github user EXPjbucher commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    Hey, I was actually looking into this today and was wondering what the status of this is? We have this exact case where lots of timers are causing high memory use (most of which don't need to be in RAM at the same time).


---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    @shixiaogang Thank you very much, appreciate your efforts on this feature! There is no need to rush this if you are busy right now, the cycle for the 1.6 release is just beginning.


---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r141942758
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    --- End diff --
    
    Users who use RocksDB backend may already have a DBOptions defined, either with their own params or from PredefinedOptions or PreDefinedOptions.DEFAULT. In that case, we should use that DBOptions here. 
    
    In cases that users don't use RocksDB state backend, move this options to PredefinedOptions.class as an enum, and reference it as PredefinedOptions.INTERNAL_TIME_SERVICE


---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    @shixiaogang thanks a lot! I will try to take a look at this as soon as possible.


---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    @shixiaogang are you still interested in contributing or collaborating on this feature? I think we should try to get this into Flink 1.6. What do you think?


---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106673407
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    --- End diff --
    
    I wonder if we could ensure all this already in the factory, so `IOException`s could be more properly propagated or even handled? this problem would automatically disappear when we reuse the RocksDB instance from the backend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106680293
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +			ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +			eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +			processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the column families.", e);
    +		}
    +
    +		this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +		this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  InternalTimerService Implementation
    +	// ------------------------------------------------------------------------
    +	
    +	@Override
    +	public void start() {
    +		// rebuild the heaps
    +		eventTimeHeap.initialize();
    +		processingTimeHeap.initialize();
    +		
    +		// register the processing timer with the minimum timestamp
    +		Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top();
    +		if (headProcessingTimer != null) {
    +			nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (db != null) {
    +			db.close();
    +		}
    +		
    +		if (dbPath != null) {
    +			try {
    +				FileSystem fileSystem = dbPath.getFileSystem();
    +				if (fileSystem.exists(dbPath)) {
    +					fileSystem.delete(dbPath, true);
    +				}
    +			} catch (IOException e) {
    +				throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void onEventTime(long timestamp) throws Exception {
    +		List<Tuple4<Integer, Long, K, N>> timers = eventTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onEventTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long timestamp) throws Exception {
    +		nextTimer = null;
    +
    +		List<Tuple4<Integer, Long, K, N>> timers = processingTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onProcessingTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +
    +		if (nextTimer == null) {
    +			Tuple4<Integer, Long, K, N> headTimer = processingTimeHeap.top();
    +			if (headTimer != null) {
    +				nextTimer = processingTimeService.registerTimer(headTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		boolean isNewHead = processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isNewHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer == null || newHeadTimer.f1 != time) {
    +				throw new IllegalStateException();
    +			}
    +
    +			nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		boolean isCurrentHead = processingTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isCurrentHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer != null) {
    +				if (newHeadTimer.f1 < time) {
    +					throw new IllegalStateException();
    +				}
    +
    +				nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup) {
    +		return eventTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup) {
    +		return processingTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		eventTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		processingTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers() {
    +		return processingTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers() {
    +		return eventTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers(N namespace) {
    +		return processingTimeHeap.numTimers(namespace);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers(N namespace) {
    +		return eventTimeHeap.numTimers(namespace);
    +	}
    +	
    +	// ------------------------------------------------------------------------
    +	//  Partitioning Methods
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Assigns the given key group to a partition.
    +	 */
    +	private static int getPartitionForKeyGroup(KeyGroupRange keyGroupRange, int keyGroup, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must not be smaller than zero.");
    +
    +		Preconditions.checkArgument(keyGroup >= keyGroupRange.getStartKeyGroup() && keyGroup <= keyGroupRange.getEndKeyGroup(), "Key group must be in the range");
    +
    +		long numKeyGroupsPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeyGroupsPerPartition * numPartitions;
    +
    +		keyGroup -= keyGroupRange.getStartKeyGroup();
    +
    +		if (keyGroup >= (numKeyGroupsPerPartition + 1L) * numFatPartitions) {
    +			return (int)((keyGroup - (numKeyGroupsPerPartition + 1L) * numFatPartitions) / numKeyGroupsPerPartition + numFatPartitions);
    +		} else {
    +			return (int)(keyGroup / (numKeyGroupsPerPartition + 1L));
    +		}
    +	}
    +
    +	/**
    +	 * Compute the range of the given partition
    +	 */
    +	private static KeyGroupRange getRangeForPartition(KeyGroupRange keyGroupRange, int partitionIndex, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(partitionIndex >= 0, "Partition index must be not smaller than zero.");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must be greater than zero.");
    +
    +		long numKeysPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeysPerPartition * numPartitions;
    +
    +		if (partitionIndex >= numFatPartitions) {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(numFatPartitions * (numKeysPerPartition + 1L) + (partitionIndex - numFatPartitions) * numKeysPerPartition);
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition - 1L);
    +
    +			return (startKeyGroup > endKeyGroup ? null : new KeyGroupRange(startKeyGroup, endKeyGroup));
    +		} else {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(partitionIndex * (numKeysPerPartition + 1L));
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition);
    +
    +			return new KeyGroupRange(startKeyGroup, endKeyGroup);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Serialization Methods
    +	// ------------------------------------------------------------------------
    +
    +	private byte[] serializeKeyGroup(int keyGroup) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(keyGroup, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the key group.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private byte[] serializeRawTimer(Tuple4<Integer, Long, K, N> rawTimer) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(rawTimer.f0, outputView);
    +			LongSerializer.INSTANCE.serialize(rawTimer.f1, outputView);
    +			keySerializer.serialize(rawTimer.f2, outputView);
    +			namespaceSerializer.serialize(rawTimer.f3, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while serializing the raw timer.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private Tuple4<Integer, Long, K, N> deserializeRawTimer(byte[] bytes) {
    +		ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
    +		DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
    +
    +		try {
    +			int keyGroup = IntSerializer.INSTANCE.deserialize(inputView);
    +			long timestamp = LongSerializer.INSTANCE.deserialize(inputView);
    +			K key = keySerializer.deserialize(inputView);
    +			N namespace = namespaceSerializer.deserialize(inputView);
    +
    +			return new Tuple4<>(keyGroup, timestamp, key, namespace);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the raw timer.", e);
    +		}
    +	}
    +
    +	/**
    +	 * A timer store backed by RocksDB. 
    +	 * 
    +	 * The timers are stored in RocksDB in the order of key groups. To allow 
    +	 * efficient access, the timers are partitioned and the leader of each 
    +	 * partition is stored in an in-memory heap. The top of the heap is 
    +	 * exactly the first timer to trigger. The heap is updated whenever the 
    +	 * partition's leader is updated.
    +	 */
    +	private class PersistentTimerHeap {
    --- End diff --
    
    Following up from this concept. I think you are implementing your own outer heap over e.g. Java's `PriorityQueue` because it lacks for some update mechanism. But I think you might simply reuse `org.apache.flink.runtime.operators.sort.PartialOrderPriorityQueue`, which has a method `adjustTop()` that should fit your needs.
    
    Maybe the following design is nice: you keep a `PartialOrderPriorityQueue<RocksTimerHeap>`, where `RocksTimerHeap` replaces the Tuple4, and acts like a facade to RocksDB, where the heap methods are translated into RocksDB operations and the top timer is always kept in the object `RocksTimerHeap`, so that the `PartialOrderPriorityQueue` can be maintained cheaply.
    
    Another disadvantage of just using Tuple4 is that the field names f0, f1, ... are not very readable in the code, and in a dedicated class, we could use int and long to represent the primitives, which again can save us a little overheads.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106673233
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    --- End diff --
    
    I think this should be initialized as `KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by orsher <gi...@git.apache.org>.
Github user orsher commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    Hi,
    
    Is there any progress here?
    This feature will made our life much easier!


---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106670283
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java ---
    @@ -18,43 +18,306 @@
     package org.apache.flink.streaming.api.operators;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Interface for working with time and timers.
      *
      * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
      * that allows to specify a key and a namespace to which timers should be scoped.
      *
    + * All d
    + * 
    + * @param <K> Type of the keys in the stream
      * @param <N> Type of the namespace to which timers are scoped.
      */
     @Internal
    -public interface InternalTimerService<N> {
    +public abstract class InternalTimerService<K, N> implements ProcessingTimeCallback, EventTimeCallback {
    +
    +	protected final ProcessingTimeService processingTimeService;
    +
    +	protected final KeyContext keyContext;
    +
    +	protected final int totalKeyGroups;
    +
    +	protected final KeyGroupRange keyGroupRange;
    +
    +	/**
    +	 * The one and only Future (if any) registered to execute the
    +	 * next {@link Triggerable} action, when its (processing) time arrives.
    +	 */
    +	protected ScheduledFuture<?> nextTimer;
    +
    +	/**
    +	 * The local event time, as denoted by the last received
    +	 * {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
    +	 */
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	// Variables to be set when the service is started.
    +
    +	protected TypeSerializer<K> keySerializer;
    +
    +	protected TypeSerializer<N> namespaceSerializer;
    +
    +	private InternalTimer.TimerSerializer<K, N> timerSerializer;
    +
    +	protected Triggerable<K, N> triggerTarget;
    +
    +	private volatile boolean isInitialized;
    +
    +	public InternalTimerService(
    +			int totalKeyGroups, 
    +			KeyGroupRange keyGroupRange, 
    +			KeyContext keyContext, 
    +			ProcessingTimeService processingTimeService) {
    +		
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.keyGroupRange = checkNotNull(keyGroupRange);
    +		this.keyContext = checkNotNull(keyContext);
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +	}
     
     	/** Returns the current processing time. */
    -	long currentProcessingTime();
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
     
     	/** Returns the current event-time watermark. */
    -	long currentWatermark();
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerProcessingTimeTimer(N namespace, long time);
    +	abstract public void registerProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteProcessingTimeTimer(N namespace, long time);
    +	abstract public void deleteProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerEventTimeTimer(N namespace, long time);
    +	abstract public void registerEventTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteEventTimeTimer(N namespace, long time);
    +	abstract public void deleteEventTimeTimer(N namespace, long time);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +	/**
    +	 * Starts the execution of the timer service
    +	 */
    +	abstract public void start();
    +
    +	/**
    +	 * Closes the timer service.
    +	 */
    +	abstract public void close();
    --- End diff --
    
    I would suggest to let the class implement `Closeable` and have the `throws IOException` in the signature. E.g. the implementation for RocksDB would even like to report `IOException`s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106668768
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/pom.xml ---
    @@ -0,0 +1,80 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    --- End diff --
    
    I think we should simply integrate the RocksDB timer service in the project flink-statebackend-rocksdb.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    Because the old code is too outdated, I have updated the PR, reimplementing RocksDBInternalTimerService from scratch. Some problems mentioned in the previous comments may still exist, but I think we can start a new round of reviewing.


---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    Very sorry for the delay. I was engaged at the work in the past months, making flink capable of the terrible data flows in Singles Day. 
    
    RocksDBInternalTimerService is among the improvements done. 
    But we adopt a very different implementation since the initial implementation presented here has several problems:
    * The initial implementation requires other rocksdb instances than the one used in RocksDBKeyedStateBackend, which makes the resource configuration very difficult. 
    * The snapshotting of RocksDBInternalTimerService here is very inefficient. Though an asynchronous and incremental implementation is available, it will duplicate much code in RocksDBKeyedStateBackend.
    
    We address these problem by introducing `SecondaryKeyedState`s which provide non-keyed access methods to the data inside a key group. Similar to normal keyed state, secondary keyed states are partitioned in to key groups and are also stored in the backends. Hence these secondary states can also benefit from asynchronous and incremental snapshotting in `RocksDBKeyedStateBackend`. 
    
    What do you think of the changes ? @StefanRRichter 
    
     


---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106671770
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -477,7 +478,7 @@ public void initializeState(StateInitializationContext context) throws Exception
     				for (int i = 0; i < noOfTimerServices; i++) {
     					String serviceName = div.readUTF();
     
    -					HeapInternalTimerService<?, ?> timerService = this.timerServices.get(serviceName);
    +					InternalTimerService<?, ?> timerService = this.timerServices.get(serviceName);
    --- End diff --
    
    Maybe you could add a parameter to the factory method that allow us to derive the generic types of key and value (e.g. the type serializers).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r141944634
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +			ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +			eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +			processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the column families.", e);
    +		}
    +
    +		this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +		this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  InternalTimerService Implementation
    +	// ------------------------------------------------------------------------
    +	
    +	@Override
    +	public void start() {
    +		// rebuild the heaps
    +		eventTimeHeap.initialize();
    +		processingTimeHeap.initialize();
    +		
    +		// register the processing timer with the minimum timestamp
    +		Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top();
    +		if (headProcessingTimer != null) {
    +			nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (db != null) {
    +			db.close();
    +		}
    +		
    +		if (dbPath != null) {
    +			try {
    +				FileSystem fileSystem = dbPath.getFileSystem();
    +				if (fileSystem.exists(dbPath)) {
    +					fileSystem.delete(dbPath, true);
    +				}
    +			} catch (IOException e) {
    +				throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e);
    --- End diff --
    
    I'd suggest creating a `FlinkRocksDBException` to wrap `RocksDBException` and throw it


---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r141944569
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    --- End diff --
    
    I'd suggest creating a `FlinkRocksDBException` to wrap `RocksDBException` and throw it


---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106674160
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    --- End diff --
    
    I think we can safely initialize this to `keyGroupRange.getNumberOfKeyGroups()`, which is required to be smaller than the maximum max_parallelism. A preconditions check might be ok, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    One additional comment, also as reminder for @aljoscha and me: after this PR is rebased, we have access to `InternalKeyContext`, which should be somehow integrated with the already existing `KeyContext` interface. This would reduce several constructor parameters and some members, e.g. the `KeyGroupRange`s and `TypeSerializer<K>`s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    @StefanRRichter Sorry for the delayed response.  I am working on it and shall update the PR by this weekend.


---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106670661
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java ---
    @@ -18,43 +18,306 @@
     package org.apache.flink.streaming.api.operators;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Interface for working with time and timers.
      *
      * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
      * that allows to specify a key and a namespace to which timers should be scoped.
      *
    + * All d
    + * 
    + * @param <K> Type of the keys in the stream
      * @param <N> Type of the namespace to which timers are scoped.
      */
     @Internal
    -public interface InternalTimerService<N> {
    +public abstract class InternalTimerService<K, N> implements ProcessingTimeCallback, EventTimeCallback {
    +
    +	protected final ProcessingTimeService processingTimeService;
    +
    +	protected final KeyContext keyContext;
    +
    +	protected final int totalKeyGroups;
    +
    +	protected final KeyGroupRange keyGroupRange;
    +
    +	/**
    +	 * The one and only Future (if any) registered to execute the
    +	 * next {@link Triggerable} action, when its (processing) time arrives.
    +	 */
    +	protected ScheduledFuture<?> nextTimer;
    +
    +	/**
    +	 * The local event time, as denoted by the last received
    +	 * {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
    +	 */
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	// Variables to be set when the service is started.
    +
    +	protected TypeSerializer<K> keySerializer;
    +
    +	protected TypeSerializer<N> namespaceSerializer;
    +
    +	private InternalTimer.TimerSerializer<K, N> timerSerializer;
    +
    +	protected Triggerable<K, N> triggerTarget;
    +
    +	private volatile boolean isInitialized;
    +
    +	public InternalTimerService(
    +			int totalKeyGroups, 
    +			KeyGroupRange keyGroupRange, 
    +			KeyContext keyContext, 
    +			ProcessingTimeService processingTimeService) {
    +		
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.keyGroupRange = checkNotNull(keyGroupRange);
    +		this.keyContext = checkNotNull(keyContext);
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +	}
     
     	/** Returns the current processing time. */
    -	long currentProcessingTime();
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
     
     	/** Returns the current event-time watermark. */
    -	long currentWatermark();
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerProcessingTimeTimer(N namespace, long time);
    +	abstract public void registerProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteProcessingTimeTimer(N namespace, long time);
    +	abstract public void deleteProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerEventTimeTimer(N namespace, long time);
    +	abstract public void registerEventTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteEventTimeTimer(N namespace, long time);
    +	abstract public void deleteEventTimeTimer(N namespace, long time);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +	/**
    +	 * Starts the execution of the timer service
    +	 */
    +	abstract public void start();
    +
    +	/**
    +	 * Closes the timer service.
    +	 */
    +	abstract public void close();
    +	
    +	public void advanceWatermark(long watermark) throws Exception {
    +		if (watermark < currentWatermark) {
    +			throw new IllegalStateException("The watermark is late.");
    +		}
    +		
    +		currentWatermark = watermark;
    +		
    +		onEventTime(watermark);
    +	}
    +
    +	/**
    +	 * Snapshots the timers (both processing and event time ones) for a given {@code keyGroupIdx}.
    +	 * @param stream the stream to write to.
    +	 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
    +	 */
    +	public void snapshotTimersForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception {
    +		InstantiationUtil.serializeObject(stream, keySerializer);
    +		InstantiationUtil.serializeObject(stream, namespaceSerializer);
    +
    +		// write the event time timers
    +		Collection<InternalTimer<K, N>> eventTimers = getEventTimeTimersForKeyGroup(keyGroupIdx);
    --- End diff --
    
    We could refactor out a method that writes `Collection<InternalTimer<K, N>>` and call it twice to de-duplicate code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106669974
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java ---
    @@ -18,43 +18,306 @@
     package org.apache.flink.streaming.api.operators;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Interface for working with time and timers.
      *
      * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
      * that allows to specify a key and a namespace to which timers should be scoped.
      *
    + * All d
    + * 
    + * @param <K> Type of the keys in the stream
      * @param <N> Type of the namespace to which timers are scoped.
      */
     @Internal
    -public interface InternalTimerService<N> {
    +public abstract class InternalTimerService<K, N> implements ProcessingTimeCallback, EventTimeCallback {
    --- End diff --
    
    I would suggest to still keep the old interface and rename this to `AbstractInternalTimerService<K, N> implements InternalTimerService<N>`. Like that, we don't need to introduce the generic parameter K in all places, which is actually giving away some implementation detail (K is used only for a member, not for the interface methods). I also like to keep the interface slim, and probably not every code that deals with `InternalTimerService` has to see all the methods, e.g. for snapshots.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106671203
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java ---
    @@ -18,43 +18,306 @@
     package org.apache.flink.streaming.api.operators;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Interface for working with time and timers.
      *
      * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
      * that allows to specify a key and a namespace to which timers should be scoped.
      *
    + * All d
    + * 
    + * @param <K> Type of the keys in the stream
      * @param <N> Type of the namespace to which timers are scoped.
      */
     @Internal
    -public interface InternalTimerService<N> {
    +public abstract class InternalTimerService<K, N> implements ProcessingTimeCallback, EventTimeCallback {
    +
    +	protected final ProcessingTimeService processingTimeService;
    +
    +	protected final KeyContext keyContext;
    +
    +	protected final int totalKeyGroups;
    +
    +	protected final KeyGroupRange keyGroupRange;
    +
    +	/**
    +	 * The one and only Future (if any) registered to execute the
    +	 * next {@link Triggerable} action, when its (processing) time arrives.
    +	 */
    +	protected ScheduledFuture<?> nextTimer;
    +
    +	/**
    +	 * The local event time, as denoted by the last received
    +	 * {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
    +	 */
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	// Variables to be set when the service is started.
    +
    +	protected TypeSerializer<K> keySerializer;
    +
    +	protected TypeSerializer<N> namespaceSerializer;
    +
    +	private InternalTimer.TimerSerializer<K, N> timerSerializer;
    +
    +	protected Triggerable<K, N> triggerTarget;
    +
    +	private volatile boolean isInitialized;
    +
    +	public InternalTimerService(
    +			int totalKeyGroups, 
    +			KeyGroupRange keyGroupRange, 
    +			KeyContext keyContext, 
    +			ProcessingTimeService processingTimeService) {
    +		
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.keyGroupRange = checkNotNull(keyGroupRange);
    +		this.keyContext = checkNotNull(keyContext);
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +	}
     
     	/** Returns the current processing time. */
    -	long currentProcessingTime();
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
     
     	/** Returns the current event-time watermark. */
    -	long currentWatermark();
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerProcessingTimeTimer(N namespace, long time);
    +	abstract public void registerProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteProcessingTimeTimer(N namespace, long time);
    +	abstract public void deleteProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerEventTimeTimer(N namespace, long time);
    +	abstract public void registerEventTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteEventTimeTimer(N namespace, long time);
    +	abstract public void deleteEventTimeTimer(N namespace, long time);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +	/**
    +	 * Starts the execution of the timer service
    +	 */
    +	abstract public void start();
    +
    +	/**
    +	 * Closes the timer service.
    +	 */
    +	abstract public void close();
    +	
    +	public void advanceWatermark(long watermark) throws Exception {
    +		if (watermark < currentWatermark) {
    +			throw new IllegalStateException("The watermark is late.");
    +		}
    +		
    +		currentWatermark = watermark;
    +		
    +		onEventTime(watermark);
    +	}
    +
    +	/**
    +	 * Snapshots the timers (both processing and event time ones) for a given {@code keyGroupIdx}.
    +	 * @param stream the stream to write to.
    +	 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
    +	 */
    +	public void snapshotTimersForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception {
    +		InstantiationUtil.serializeObject(stream, keySerializer);
    +		InstantiationUtil.serializeObject(stream, namespaceSerializer);
    +
    +		// write the event time timers
    +		Collection<InternalTimer<K, N>> eventTimers = getEventTimeTimersForKeyGroup(keyGroupIdx);
    +		if (eventTimers != null) {
    +			stream.writeInt(eventTimers.size());
    +			for (InternalTimer<K, N> timer : eventTimers) {
    +				this.timerSerializer.serialize(timer, stream);
    +			}
    +		} else {
    +			stream.writeInt(0);
    +		}
    +
    +		// write the processing time timers
    +		Collection<InternalTimer<K, N>> processingTimers = getProcessingTimeTimersForKeyGroup(keyGroupIdx);
    +		if (processingTimers != null) {
    +			stream.writeInt(processingTimers.size());
    +			for (InternalTimer<K, N> timer : processingTimers) {
    +				this.timerSerializer.serialize(timer, stream);
    +			}
    +		} else {
    +			stream.writeInt(0);
    +		}
    +	}
    +
    +	/**
    +	 * Restore the timers (both processing and event time ones) for a given {@code keyGroupIdx}.
    +	 * @param stream the stream to read from.
    +	 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
    +	 * @param userCodeClassLoader the class loader that will be used to deserialize
    +	 * 								the local key and namespace serializers.
    +	 */
    +	public void restoreTimersForKeyGroup(DataInputViewStreamWrapper stream, int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
    +		TypeSerializer<K> tmpKeySerializer = InstantiationUtil.deserializeObject(stream, userCodeClassLoader);
    +		TypeSerializer<N> tmpNamespaceSerializer = InstantiationUtil.deserializeObject(stream, userCodeClassLoader);
    +
    +		if ((this.keySerializer != null && !this.keySerializer.equals(tmpKeySerializer)) ||
    +					(this.namespaceSerializer != null && !this.namespaceSerializer.equals(tmpNamespaceSerializer))) {
    +
    +				throw new IllegalArgumentException("Tried to restore timers " +
    +						"for the same service with different serializers.");
    +		}
    +
    +		this.keySerializer = tmpKeySerializer;
    +		this.namespaceSerializer = tmpNamespaceSerializer;
    +
    +		InternalTimer.TimerSerializer<K, N> timerSerializer =
    +				new InternalTimer.TimerSerializer<>(this.keySerializer, this.namespaceSerializer);
    +
    +		checkArgument(keyGroupRange.contains(keyGroupIdx),
    +				"Key Group " + keyGroupIdx + " does not belong to the local range.");
    +
    +		// read the event time timers
    +		int sizeOfEventTimeTimers = stream.readInt();
    +		if (sizeOfEventTimeTimers > 0) {
    +			List<InternalTimer<K, N>> eventTimeTimers = new ArrayList<>();
    +			for (int i = 0; i < sizeOfEventTimeTimers; i++) {
    +				InternalTimer<K, N> timer = timerSerializer.deserialize(stream);
    +				
    +				eventTimeTimers.add(timer);
    +			}
    +
    +			restoreEventTimeTimersForKeyGroup(keyGroupIdx, eventTimeTimers);
    +		}
    +
    +		// read the processing time timers
    +		int sizeOfProcessingTimeTimers = stream.readInt();
    +		if (sizeOfProcessingTimeTimers > 0) {
    +			List<InternalTimer<K, N>> processingTimeTimers = new ArrayList<>();
    +			for (int i = 0; i < sizeOfProcessingTimeTimers; i++) {
    +				InternalTimer<K, N> timer = timerSerializer.deserialize(stream);
    +				processingTimeTimers.add(timer);
    +			}
    +
    +			restoreProcessingTimeTimersForKeyGroup(keyGroupIdx, processingTimeTimers);
    +		}
    +	}
    +
    +	/**
    +	 * Starts the local {@link InternalTimerService} by:
    +	 * <ol>
    +	 *     <li>Setting the {@code keySerialized} and {@code namespaceSerializer} for the timers it will contain.</li>
    +	 *     <li>Setting the {@code triggerTarget} which contains the action to be performed when a timer fires.</li>
    +	 *     <li>Re-registering timers that were retrieved after recoveting from a node failure, if any.</li>
    +	 * </ol>
    +	 * This method can be called multiple times, as long as it is called with the same serializers.
    +	 */
    +	void startTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			Triggerable<K, N> triggerTarget)
    +	{
    +
    +		if (isInitialized) {
    --- End diff --
    
    Either `isInitialized` does not require to be volatile or this code is potentially broken. If you need the thread-safety, I suggest replacing this with `AtomicBoolean::compareAndSet(...)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106677119
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +			ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +			eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +			processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the column families.", e);
    +		}
    +
    +		this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +		this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  InternalTimerService Implementation
    +	// ------------------------------------------------------------------------
    +	
    +	@Override
    +	public void start() {
    +		// rebuild the heaps
    +		eventTimeHeap.initialize();
    +		processingTimeHeap.initialize();
    +		
    +		// register the processing timer with the minimum timestamp
    +		Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top();
    +		if (headProcessingTimer != null) {
    +			nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (db != null) {
    +			db.close();
    +		}
    +		
    +		if (dbPath != null) {
    +			try {
    +				FileSystem fileSystem = dbPath.getFileSystem();
    +				if (fileSystem.exists(dbPath)) {
    +					fileSystem.delete(dbPath, true);
    +				}
    +			} catch (IOException e) {
    +				throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void onEventTime(long timestamp) throws Exception {
    +		List<Tuple4<Integer, Long, K, N>> timers = eventTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onEventTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long timestamp) throws Exception {
    +		nextTimer = null;
    +
    +		List<Tuple4<Integer, Long, K, N>> timers = processingTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onProcessingTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +
    +		if (nextTimer == null) {
    +			Tuple4<Integer, Long, K, N> headTimer = processingTimeHeap.top();
    +			if (headTimer != null) {
    +				nextTimer = processingTimeService.registerTimer(headTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		boolean isNewHead = processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isNewHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer == null || newHeadTimer.f1 != time) {
    +				throw new IllegalStateException();
    +			}
    +
    +			nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		boolean isCurrentHead = processingTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isCurrentHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer != null) {
    +				if (newHeadTimer.f1 < time) {
    +					throw new IllegalStateException();
    +				}
    +
    +				nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup) {
    +		return eventTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup) {
    +		return processingTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		eventTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		processingTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers() {
    +		return processingTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers() {
    +		return eventTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers(N namespace) {
    +		return processingTimeHeap.numTimers(namespace);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers(N namespace) {
    +		return eventTimeHeap.numTimers(namespace);
    +	}
    +	
    +	// ------------------------------------------------------------------------
    +	//  Partitioning Methods
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Assigns the given key group to a partition.
    +	 */
    +	private static int getPartitionForKeyGroup(KeyGroupRange keyGroupRange, int keyGroup, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must not be smaller than zero.");
    +
    +		Preconditions.checkArgument(keyGroup >= keyGroupRange.getStartKeyGroup() && keyGroup <= keyGroupRange.getEndKeyGroup(), "Key group must be in the range");
    +
    +		long numKeyGroupsPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeyGroupsPerPartition * numPartitions;
    +
    +		keyGroup -= keyGroupRange.getStartKeyGroup();
    +
    +		if (keyGroup >= (numKeyGroupsPerPartition + 1L) * numFatPartitions) {
    +			return (int)((keyGroup - (numKeyGroupsPerPartition + 1L) * numFatPartitions) / numKeyGroupsPerPartition + numFatPartitions);
    +		} else {
    +			return (int)(keyGroup / (numKeyGroupsPerPartition + 1L));
    +		}
    +	}
    +
    +	/**
    +	 * Compute the range of the given partition
    +	 */
    +	private static KeyGroupRange getRangeForPartition(KeyGroupRange keyGroupRange, int partitionIndex, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(partitionIndex >= 0, "Partition index must be not smaller than zero.");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must be greater than zero.");
    +
    +		long numKeysPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeysPerPartition * numPartitions;
    +
    +		if (partitionIndex >= numFatPartitions) {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(numFatPartitions * (numKeysPerPartition + 1L) + (partitionIndex - numFatPartitions) * numKeysPerPartition);
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition - 1L);
    +
    +			return (startKeyGroup > endKeyGroup ? null : new KeyGroupRange(startKeyGroup, endKeyGroup));
    +		} else {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(partitionIndex * (numKeysPerPartition + 1L));
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition);
    +
    +			return new KeyGroupRange(startKeyGroup, endKeyGroup);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Serialization Methods
    +	// ------------------------------------------------------------------------
    +
    +	private byte[] serializeKeyGroup(int keyGroup) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(keyGroup, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the key group.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private byte[] serializeRawTimer(Tuple4<Integer, Long, K, N> rawTimer) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(rawTimer.f0, outputView);
    +			LongSerializer.INSTANCE.serialize(rawTimer.f1, outputView);
    +			keySerializer.serialize(rawTimer.f2, outputView);
    +			namespaceSerializer.serialize(rawTimer.f3, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while serializing the raw timer.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private Tuple4<Integer, Long, K, N> deserializeRawTimer(byte[] bytes) {
    +		ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
    +		DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
    +
    +		try {
    +			int keyGroup = IntSerializer.INSTANCE.deserialize(inputView);
    +			long timestamp = LongSerializer.INSTANCE.deserialize(inputView);
    +			K key = keySerializer.deserialize(inputView);
    +			N namespace = namespaceSerializer.deserialize(inputView);
    +
    +			return new Tuple4<>(keyGroup, timestamp, key, namespace);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the raw timer.", e);
    +		}
    +	}
    +
    +	/**
    +	 * A timer store backed by RocksDB. 
    +	 * 
    +	 * The timers are stored in RocksDB in the order of key groups. To allow 
    +	 * efficient access, the timers are partitioned and the leader of each 
    +	 * partition is stored in an in-memory heap. The top of the heap is 
    +	 * exactly the first timer to trigger. The heap is updated whenever the 
    +	 * partition's leader is updated.
    +	 */
    +	private class PersistentTimerHeap {
    +
    +		private final ColumnFamilyHandle handle;
    +
    +		/** Leader timers in the partitions */
    +		private final Tuple4<Integer, Long, K, N>[] partitionHeadTimers;
    +		
    +		/** The order of the partition's leader in the heap */
    +		private final int[] orders;
    +		
    +		/** The partitions in the order of their leaders in the heap */
    +		private final int[] indices;
    +		
    +		@SuppressWarnings("unchecked")
    +		PersistentTimerHeap(int capacity, ColumnFamilyHandle handle) {
    +			this.handle = handle;
    +
    +			this.partitionHeadTimers = (Tuple4<Integer, Long, K, N>[])new Tuple4[capacity];
    +			this.indices = new int[capacity];
    +			this.orders = new int[capacity];
    +			
    +			for (int i = 0; i < capacity; i++) {
    +				orders[i] = i;
    +				indices[i] = i;
    +			}
    +		}
    +
    +		private int getParentOrder(int order) {
    +			return (order - 1) / 2;
    +		}
    +
    +		private int getLeftChildOrder(int order) {
    +			return order * 2 + 1;
    +		}
    +
    +		private int getRightChildOrder(int order) {
    +			return order * 2 + 2;
    +		}
    +
    +		private long getTimestamp(Tuple4<Integer, Long, K, N> rawTimer) {
    +			return rawTimer == null ? Long.MAX_VALUE : rawTimer.f1;
    +		}
    +
    +		/** Rebuild the heap with the given DB. */
    +		void initialize() {
    +			int currentPartition = -1;
    +			Tuple4<Integer, Long, K, N> currentPartitionLeaderTimer = null;
    +
    +			RocksIterator iterator = db.newIterator(handle);
    +			iterator.seekToFirst();
    +
    +			while (iterator.isValid()) {
    +				Tuple4<Integer, Long, K, N> rawTimer = deserializeRawTimer(iterator.key());
    +
    +				int partition = getPartitionForKeyGroup(keyGroupRange, rawTimer.f0, numPartitions);
    +				if (partition == currentPartition) {
    +					if (currentPartitionLeaderTimer == null || rawTimer.f1 < currentPartitionLeaderTimer.f1) {
    +						currentPartitionLeaderTimer = rawTimer;
    +					}
    +				} else {
    +					if (currentPartitionLeaderTimer != null) {
    +						updatePartitionLeader(currentPartition, currentPartitionLeaderTimer);
    +					}
    +
    +					currentPartition = partition;
    +					currentPartitionLeaderTimer = rawTimer;
    +				}
    +
    +				int nextKeyGroup = rawTimer.f0 + 1;
    --- End diff --
    
    For example, here it would be more beneficial to have a dedicate class instead of a `Tuple4` for the timers. Like this, it is harder to figure out the meanings of f0, f1, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    @vpernin Thanks very much for your attention. The PR is supposed to work on 1.3-SNAPSHOT, but it's not testable now due to some known bugs. 
    
    Besides, i want to add support for asynchronous snapshots of timers in this pull request. Currently, the snapshots for timers are taken synchronously --- no stream record can be processed before the snapshots are taken. In our tests where there are millions of timers, it takes approximately several seconds to complete the snapshotting. The performance, hence, is significantly degraded when the checkpoint frequency is large.
    
    To allow asynchronous snapshotting, we need some refactoring on how internal timer services are restored and snapshotted. Now `InternalTimerService` s, similar to keyed states, are stored in `KeyedStateBackend`. That way, we can benefit from the optimizations made on the snapshotting of keyed states, taking snapshots asynchronously (and incrementally in the near future).
    
    I am working on this work right now.  It's appreciated that you could help test the feature when it is done. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106676403
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +			ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +			eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +			processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the column families.", e);
    +		}
    +
    +		this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +		this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  InternalTimerService Implementation
    +	// ------------------------------------------------------------------------
    +	
    +	@Override
    +	public void start() {
    +		// rebuild the heaps
    +		eventTimeHeap.initialize();
    +		processingTimeHeap.initialize();
    +		
    +		// register the processing timer with the minimum timestamp
    +		Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top();
    +		if (headProcessingTimer != null) {
    +			nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (db != null) {
    +			db.close();
    +		}
    +		
    +		if (dbPath != null) {
    +			try {
    +				FileSystem fileSystem = dbPath.getFileSystem();
    +				if (fileSystem.exists(dbPath)) {
    +					fileSystem.delete(dbPath, true);
    +				}
    +			} catch (IOException e) {
    +				throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void onEventTime(long timestamp) throws Exception {
    +		List<Tuple4<Integer, Long, K, N>> timers = eventTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onEventTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long timestamp) throws Exception {
    +		nextTimer = null;
    +
    +		List<Tuple4<Integer, Long, K, N>> timers = processingTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onProcessingTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +
    +		if (nextTimer == null) {
    +			Tuple4<Integer, Long, K, N> headTimer = processingTimeHeap.top();
    +			if (headTimer != null) {
    +				nextTimer = processingTimeService.registerTimer(headTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		boolean isNewHead = processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isNewHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer == null || newHeadTimer.f1 != time) {
    +				throw new IllegalStateException();
    +			}
    +
    +			nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		boolean isCurrentHead = processingTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isCurrentHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer != null) {
    +				if (newHeadTimer.f1 < time) {
    +					throw new IllegalStateException();
    +				}
    +
    +				nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup) {
    +		return eventTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup) {
    +		return processingTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		eventTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		processingTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers() {
    +		return processingTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers() {
    +		return eventTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers(N namespace) {
    +		return processingTimeHeap.numTimers(namespace);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers(N namespace) {
    +		return eventTimeHeap.numTimers(namespace);
    +	}
    +	
    +	// ------------------------------------------------------------------------
    +	//  Partitioning Methods
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Assigns the given key group to a partition.
    +	 */
    +	private static int getPartitionForKeyGroup(KeyGroupRange keyGroupRange, int keyGroup, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must not be smaller than zero.");
    +
    +		Preconditions.checkArgument(keyGroup >= keyGroupRange.getStartKeyGroup() && keyGroup <= keyGroupRange.getEndKeyGroup(), "Key group must be in the range");
    +
    +		long numKeyGroupsPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeyGroupsPerPartition * numPartitions;
    +
    +		keyGroup -= keyGroupRange.getStartKeyGroup();
    +
    +		if (keyGroup >= (numKeyGroupsPerPartition + 1L) * numFatPartitions) {
    +			return (int)((keyGroup - (numKeyGroupsPerPartition + 1L) * numFatPartitions) / numKeyGroupsPerPartition + numFatPartitions);
    +		} else {
    +			return (int)(keyGroup / (numKeyGroupsPerPartition + 1L));
    +		}
    +	}
    +
    +	/**
    +	 * Compute the range of the given partition
    +	 */
    +	private static KeyGroupRange getRangeForPartition(KeyGroupRange keyGroupRange, int partitionIndex, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(partitionIndex >= 0, "Partition index must be not smaller than zero.");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must be greater than zero.");
    +
    +		long numKeysPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeysPerPartition * numPartitions;
    +
    +		if (partitionIndex >= numFatPartitions) {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(numFatPartitions * (numKeysPerPartition + 1L) + (partitionIndex - numFatPartitions) * numKeysPerPartition);
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition - 1L);
    +
    +			return (startKeyGroup > endKeyGroup ? null : new KeyGroupRange(startKeyGroup, endKeyGroup));
    +		} else {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(partitionIndex * (numKeysPerPartition + 1L));
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition);
    +
    +			return new KeyGroupRange(startKeyGroup, endKeyGroup);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Serialization Methods
    +	// ------------------------------------------------------------------------
    +
    +	private byte[] serializeKeyGroup(int keyGroup) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(keyGroup, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the key group.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private byte[] serializeRawTimer(Tuple4<Integer, Long, K, N> rawTimer) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(rawTimer.f0, outputView);
    +			LongSerializer.INSTANCE.serialize(rawTimer.f1, outputView);
    +			keySerializer.serialize(rawTimer.f2, outputView);
    +			namespaceSerializer.serialize(rawTimer.f3, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while serializing the raw timer.", e);
    --- End diff --
    
    I would disencourage the usage of plain `RuntimeException` for the same reason we use more specific subclasses of `Exception`, like  `IOException`. Better use a more specific subclass of `RuntimeException`, like `IllegalStateException` or whatever looks more appropriate. This should be changed in serveral places in this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106677685
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +			ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +			eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +			processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the column families.", e);
    +		}
    +
    +		this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +		this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  InternalTimerService Implementation
    +	// ------------------------------------------------------------------------
    +	
    +	@Override
    +	public void start() {
    +		// rebuild the heaps
    +		eventTimeHeap.initialize();
    +		processingTimeHeap.initialize();
    +		
    +		// register the processing timer with the minimum timestamp
    +		Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top();
    +		if (headProcessingTimer != null) {
    +			nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (db != null) {
    +			db.close();
    +		}
    +		
    +		if (dbPath != null) {
    +			try {
    +				FileSystem fileSystem = dbPath.getFileSystem();
    +				if (fileSystem.exists(dbPath)) {
    +					fileSystem.delete(dbPath, true);
    +				}
    +			} catch (IOException e) {
    +				throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void onEventTime(long timestamp) throws Exception {
    +		List<Tuple4<Integer, Long, K, N>> timers = eventTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onEventTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long timestamp) throws Exception {
    +		nextTimer = null;
    +
    +		List<Tuple4<Integer, Long, K, N>> timers = processingTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onProcessingTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +
    +		if (nextTimer == null) {
    +			Tuple4<Integer, Long, K, N> headTimer = processingTimeHeap.top();
    +			if (headTimer != null) {
    +				nextTimer = processingTimeService.registerTimer(headTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		boolean isNewHead = processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isNewHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer == null || newHeadTimer.f1 != time) {
    +				throw new IllegalStateException();
    +			}
    +
    +			nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		boolean isCurrentHead = processingTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isCurrentHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer != null) {
    +				if (newHeadTimer.f1 < time) {
    +					throw new IllegalStateException();
    +				}
    +
    +				nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup) {
    +		return eventTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup) {
    +		return processingTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		eventTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		processingTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers() {
    +		return processingTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers() {
    +		return eventTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers(N namespace) {
    +		return processingTimeHeap.numTimers(namespace);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers(N namespace) {
    +		return eventTimeHeap.numTimers(namespace);
    +	}
    +	
    +	// ------------------------------------------------------------------------
    +	//  Partitioning Methods
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Assigns the given key group to a partition.
    +	 */
    +	private static int getPartitionForKeyGroup(KeyGroupRange keyGroupRange, int keyGroup, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must not be smaller than zero.");
    +
    +		Preconditions.checkArgument(keyGroup >= keyGroupRange.getStartKeyGroup() && keyGroup <= keyGroupRange.getEndKeyGroup(), "Key group must be in the range");
    +
    +		long numKeyGroupsPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeyGroupsPerPartition * numPartitions;
    +
    +		keyGroup -= keyGroupRange.getStartKeyGroup();
    +
    +		if (keyGroup >= (numKeyGroupsPerPartition + 1L) * numFatPartitions) {
    +			return (int)((keyGroup - (numKeyGroupsPerPartition + 1L) * numFatPartitions) / numKeyGroupsPerPartition + numFatPartitions);
    +		} else {
    +			return (int)(keyGroup / (numKeyGroupsPerPartition + 1L));
    +		}
    +	}
    +
    +	/**
    +	 * Compute the range of the given partition
    +	 */
    +	private static KeyGroupRange getRangeForPartition(KeyGroupRange keyGroupRange, int partitionIndex, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(partitionIndex >= 0, "Partition index must be not smaller than zero.");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must be greater than zero.");
    +
    +		long numKeysPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeysPerPartition * numPartitions;
    +
    +		if (partitionIndex >= numFatPartitions) {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(numFatPartitions * (numKeysPerPartition + 1L) + (partitionIndex - numFatPartitions) * numKeysPerPartition);
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition - 1L);
    +
    +			return (startKeyGroup > endKeyGroup ? null : new KeyGroupRange(startKeyGroup, endKeyGroup));
    +		} else {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(partitionIndex * (numKeysPerPartition + 1L));
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition);
    +
    +			return new KeyGroupRange(startKeyGroup, endKeyGroup);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Serialization Methods
    +	// ------------------------------------------------------------------------
    +
    +	private byte[] serializeKeyGroup(int keyGroup) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(keyGroup, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the key group.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private byte[] serializeRawTimer(Tuple4<Integer, Long, K, N> rawTimer) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(rawTimer.f0, outputView);
    +			LongSerializer.INSTANCE.serialize(rawTimer.f1, outputView);
    +			keySerializer.serialize(rawTimer.f2, outputView);
    +			namespaceSerializer.serialize(rawTimer.f3, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while serializing the raw timer.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private Tuple4<Integer, Long, K, N> deserializeRawTimer(byte[] bytes) {
    +		ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
    +		DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
    +
    +		try {
    +			int keyGroup = IntSerializer.INSTANCE.deserialize(inputView);
    +			long timestamp = LongSerializer.INSTANCE.deserialize(inputView);
    +			K key = keySerializer.deserialize(inputView);
    +			N namespace = namespaceSerializer.deserialize(inputView);
    +
    +			return new Tuple4<>(keyGroup, timestamp, key, namespace);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the raw timer.", e);
    +		}
    +	}
    +
    +	/**
    +	 * A timer store backed by RocksDB. 
    +	 * 
    +	 * The timers are stored in RocksDB in the order of key groups. To allow 
    +	 * efficient access, the timers are partitioned and the leader of each 
    +	 * partition is stored in an in-memory heap. The top of the heap is 
    +	 * exactly the first timer to trigger. The heap is updated whenever the 
    +	 * partition's leader is updated.
    +	 */
    +	private class PersistentTimerHeap {
    --- End diff --
    
    I think this class could benefit from a more detailed documentation. For example, about the difference/relationship of key-groups, partitions, fatpartitions.
    You could also explain, that this is conceptually like a heap-of-heaps, where the outer heap holds inner heaps, one for each key-group and that the inner heaps hold the timers. Then, the outer heap is in-memory and the inner heaps are in fact completely ordered and held in RocksDB.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    @shixiaogang Sounds nice! Can you please open a PR if possible? Thanks!


---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by vpernin <gi...@git.apache.org>.
Github user vpernin commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    Is this PR in a testable/working state based on 1.3-SNAPHOT ?
    I could help test with a quite large number of window timers (between 2M and 6M).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106675685
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +			ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +			eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +			processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the column families.", e);
    +		}
    +
    +		this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +		this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  InternalTimerService Implementation
    +	// ------------------------------------------------------------------------
    +	
    +	@Override
    +	public void start() {
    +		// rebuild the heaps
    +		eventTimeHeap.initialize();
    +		processingTimeHeap.initialize();
    +		
    +		// register the processing timer with the minimum timestamp
    +		Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top();
    +		if (headProcessingTimer != null) {
    +			nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (db != null) {
    +			db.close();
    +		}
    +		
    +		if (dbPath != null) {
    +			try {
    +				FileSystem fileSystem = dbPath.getFileSystem();
    +				if (fileSystem.exists(dbPath)) {
    +					fileSystem.delete(dbPath, true);
    +				}
    +			} catch (IOException e) {
    +				throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void onEventTime(long timestamp) throws Exception {
    +		List<Tuple4<Integer, Long, K, N>> timers = eventTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onEventTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long timestamp) throws Exception {
    +		nextTimer = null;
    +
    +		List<Tuple4<Integer, Long, K, N>> timers = processingTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onProcessingTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +
    +		if (nextTimer == null) {
    +			Tuple4<Integer, Long, K, N> headTimer = processingTimeHeap.top();
    +			if (headTimer != null) {
    +				nextTimer = processingTimeService.registerTimer(headTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		boolean isNewHead = processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isNewHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer == null || newHeadTimer.f1 != time) {
    +				throw new IllegalStateException();
    +			}
    +
    +			nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		boolean isCurrentHead = processingTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isCurrentHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer != null) {
    +				if (newHeadTimer.f1 < time) {
    +					throw new IllegalStateException();
    +				}
    +
    +				nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup) {
    +		return eventTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup) {
    +		return processingTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		eventTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		processingTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers() {
    +		return processingTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers() {
    +		return eventTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers(N namespace) {
    +		return processingTimeHeap.numTimers(namespace);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers(N namespace) {
    +		return eventTimeHeap.numTimers(namespace);
    +	}
    +	
    +	// ------------------------------------------------------------------------
    +	//  Partitioning Methods
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Assigns the given key group to a partition.
    +	 */
    +	private static int getPartitionForKeyGroup(KeyGroupRange keyGroupRange, int keyGroup, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must not be smaller than zero.");
    +
    +		Preconditions.checkArgument(keyGroup >= keyGroupRange.getStartKeyGroup() && keyGroup <= keyGroupRange.getEndKeyGroup(), "Key group must be in the range");
    +
    +		long numKeyGroupsPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeyGroupsPerPartition * numPartitions;
    +
    +		keyGroup -= keyGroupRange.getStartKeyGroup();
    +
    +		if (keyGroup >= (numKeyGroupsPerPartition + 1L) * numFatPartitions) {
    +			return (int)((keyGroup - (numKeyGroupsPerPartition + 1L) * numFatPartitions) / numKeyGroupsPerPartition + numFatPartitions);
    +		} else {
    +			return (int)(keyGroup / (numKeyGroupsPerPartition + 1L));
    +		}
    +	}
    +
    +	/**
    +	 * Compute the range of the given partition
    +	 */
    +	private static KeyGroupRange getRangeForPartition(KeyGroupRange keyGroupRange, int partitionIndex, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(partitionIndex >= 0, "Partition index must be not smaller than zero.");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must be greater than zero.");
    +
    +		long numKeysPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeysPerPartition * numPartitions;
    +
    +		if (partitionIndex >= numFatPartitions) {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(numFatPartitions * (numKeysPerPartition + 1L) + (partitionIndex - numFatPartitions) * numKeysPerPartition);
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition - 1L);
    +
    +			return (startKeyGroup > endKeyGroup ? null : new KeyGroupRange(startKeyGroup, endKeyGroup));
    +		} else {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(partitionIndex * (numKeysPerPartition + 1L));
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition);
    +
    +			return new KeyGroupRange(startKeyGroup, endKeyGroup);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Serialization Methods
    +	// ------------------------------------------------------------------------
    +
    +	private byte[] serializeKeyGroup(int keyGroup) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(keyGroup, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the key group.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private byte[] serializeRawTimer(Tuple4<Integer, Long, K, N> rawTimer) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    --- End diff --
    
    I suggest replacing this with `org.apache.flink.core.memory.ByteArrayInputStreamWithPos`, which removed the unnecessary synchronization from all methods. There are more cases of this in the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106670841
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java ---
    @@ -18,43 +18,306 @@
     package org.apache.flink.streaming.api.operators;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Interface for working with time and timers.
      *
      * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
      * that allows to specify a key and a namespace to which timers should be scoped.
      *
    + * All d
    + * 
    + * @param <K> Type of the keys in the stream
      * @param <N> Type of the namespace to which timers are scoped.
      */
     @Internal
    -public interface InternalTimerService<N> {
    +public abstract class InternalTimerService<K, N> implements ProcessingTimeCallback, EventTimeCallback {
    +
    +	protected final ProcessingTimeService processingTimeService;
    +
    +	protected final KeyContext keyContext;
    +
    +	protected final int totalKeyGroups;
    +
    +	protected final KeyGroupRange keyGroupRange;
    +
    +	/**
    +	 * The one and only Future (if any) registered to execute the
    +	 * next {@link Triggerable} action, when its (processing) time arrives.
    +	 */
    +	protected ScheduledFuture<?> nextTimer;
    +
    +	/**
    +	 * The local event time, as denoted by the last received
    +	 * {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
    +	 */
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	// Variables to be set when the service is started.
    +
    +	protected TypeSerializer<K> keySerializer;
    +
    +	protected TypeSerializer<N> namespaceSerializer;
    +
    +	private InternalTimer.TimerSerializer<K, N> timerSerializer;
    +
    +	protected Triggerable<K, N> triggerTarget;
    +
    +	private volatile boolean isInitialized;
    +
    +	public InternalTimerService(
    +			int totalKeyGroups, 
    +			KeyGroupRange keyGroupRange, 
    +			KeyContext keyContext, 
    +			ProcessingTimeService processingTimeService) {
    +		
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.keyGroupRange = checkNotNull(keyGroupRange);
    +		this.keyContext = checkNotNull(keyContext);
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +	}
     
     	/** Returns the current processing time. */
    -	long currentProcessingTime();
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
     
     	/** Returns the current event-time watermark. */
    -	long currentWatermark();
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerProcessingTimeTimer(N namespace, long time);
    +	abstract public void registerProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteProcessingTimeTimer(N namespace, long time);
    +	abstract public void deleteProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerEventTimeTimer(N namespace, long time);
    +	abstract public void registerEventTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteEventTimeTimer(N namespace, long time);
    +	abstract public void deleteEventTimeTimer(N namespace, long time);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +	/**
    +	 * Starts the execution of the timer service
    +	 */
    +	abstract public void start();
    +
    +	/**
    +	 * Closes the timer service.
    +	 */
    +	abstract public void close();
    +	
    +	public void advanceWatermark(long watermark) throws Exception {
    +		if (watermark < currentWatermark) {
    +			throw new IllegalStateException("The watermark is late.");
    +		}
    +		
    +		currentWatermark = watermark;
    +		
    +		onEventTime(watermark);
    +	}
    +
    +	/**
    +	 * Snapshots the timers (both processing and event time ones) for a given {@code keyGroupIdx}.
    +	 * @param stream the stream to write to.
    +	 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
    +	 */
    +	public void snapshotTimersForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception {
    +		InstantiationUtil.serializeObject(stream, keySerializer);
    +		InstantiationUtil.serializeObject(stream, namespaceSerializer);
    +
    +		// write the event time timers
    +		Collection<InternalTimer<K, N>> eventTimers = getEventTimeTimersForKeyGroup(keyGroupIdx);
    +		if (eventTimers != null) {
    +			stream.writeInt(eventTimers.size());
    +			for (InternalTimer<K, N> timer : eventTimers) {
    +				this.timerSerializer.serialize(timer, stream);
    +			}
    +		} else {
    +			stream.writeInt(0);
    +		}
    +
    +		// write the processing time timers
    +		Collection<InternalTimer<K, N>> processingTimers = getProcessingTimeTimersForKeyGroup(keyGroupIdx);
    +		if (processingTimers != null) {
    +			stream.writeInt(processingTimers.size());
    +			for (InternalTimer<K, N> timer : processingTimers) {
    +				this.timerSerializer.serialize(timer, stream);
    +			}
    +		} else {
    +			stream.writeInt(0);
    +		}
    +	}
    +
    +	/**
    +	 * Restore the timers (both processing and event time ones) for a given {@code keyGroupIdx}.
    +	 * @param stream the stream to read from.
    +	 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
    +	 * @param userCodeClassLoader the class loader that will be used to deserialize
    +	 * 								the local key and namespace serializers.
    +	 */
    +	public void restoreTimersForKeyGroup(DataInputViewStreamWrapper stream, int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
    --- End diff --
    
    Same comments as for snapshot apply here: we can reduce visibility and refactor to de-duplicate code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106675210
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +			ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +			eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +			processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the column families.", e);
    +		}
    +
    +		this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +		this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  InternalTimerService Implementation
    +	// ------------------------------------------------------------------------
    +	
    +	@Override
    +	public void start() {
    +		// rebuild the heaps
    +		eventTimeHeap.initialize();
    +		processingTimeHeap.initialize();
    +		
    +		// register the processing timer with the minimum timestamp
    +		Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top();
    +		if (headProcessingTimer != null) {
    +			nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (db != null) {
    +			db.close();
    +		}
    +		
    +		if (dbPath != null) {
    +			try {
    +				FileSystem fileSystem = dbPath.getFileSystem();
    +				if (fileSystem.exists(dbPath)) {
    +					fileSystem.delete(dbPath, true);
    +				}
    +			} catch (IOException e) {
    +				throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void onEventTime(long timestamp) throws Exception {
    +		List<Tuple4<Integer, Long, K, N>> timers = eventTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onEventTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long timestamp) throws Exception {
    +		nextTimer = null;
    +
    +		List<Tuple4<Integer, Long, K, N>> timers = processingTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onProcessingTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +
    +		if (nextTimer == null) {
    +			Tuple4<Integer, Long, K, N> headTimer = processingTimeHeap.top();
    +			if (headTimer != null) {
    +				nextTimer = processingTimeService.registerTimer(headTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		boolean isNewHead = processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isNewHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer == null || newHeadTimer.f1 != time) {
    +				throw new IllegalStateException();
    --- End diff --
    
    The exception should contain a message about what went wrong. This should also be fixed in similar cases in this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106674867
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +			ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +			eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +			processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the column families.", e);
    +		}
    +
    +		this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +		this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  InternalTimerService Implementation
    +	// ------------------------------------------------------------------------
    +	
    +	@Override
    +	public void start() {
    +		// rebuild the heaps
    +		eventTimeHeap.initialize();
    +		processingTimeHeap.initialize();
    +		
    +		// register the processing timer with the minimum timestamp
    +		Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top();
    +		if (headProcessingTimer != null) {
    +			nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (db != null) {
    +			db.close();
    +		}
    +		
    +		if (dbPath != null) {
    +			try {
    +				FileSystem fileSystem = dbPath.getFileSystem();
    +				if (fileSystem.exists(dbPath)) {
    +					fileSystem.delete(dbPath, true);
    +				}
    +			} catch (IOException e) {
    +				throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e);
    --- End diff --
    
    i think it is important that this can somehow report IOException. We might need a way to react somehow in the calling code in case the delete went wrong, to prevent lingering files. Se also my comment about adding the exception to the superclass's method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106669225
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java ---
    @@ -18,43 +18,306 @@
     package org.apache.flink.streaming.api.operators;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Interface for working with time and timers.
      *
      * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
      * that allows to specify a key and a namespace to which timers should be scoped.
      *
    + * All d
    --- End diff --
    
    Unfinished comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106673680
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    --- End diff --
    
    I wonder if we should keep a reference to this so that we can dispose the native object on close? Again, this problem goes away when reusing the RocksDB instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    I think this implementation was (close to complete) and should be working, but the PR is outdated. I think it should be possible to attempt a manual rebase.


---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106672829
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    --- End diff --
    
    I think that we should avoid creating more instances of RocksDB if we can. This makes native memory consumption more unpredictable and creates more files on snapshots. My suggestion is do do a refactoring that timer services must be requested through a keyed state backend. The RocksDB backend could then re-use the same database instance as in the keyed backend for the timer service, to reduce all the overheads. I think this request should still allow for asking a RocksDB based timer service, even though using a `HeapKeyedStateBackend` and vice-versa.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    @shixiaogang I had a brief look into the updated PR. It seems like this current version does not include integrating timers with the other keyed state? I am asking because my plan would be to integrate timers more deeply with the keyed backends so that we can also use asynchronous and incremental snapshots. One of your previous comments mentioned something in this direction as well and I wanted to ask what your plans about that are? Otherwise,  I would take over and probably cherrypick some parts like the rocks timer heap from this PR and integrate them with the keyed backends myself so that those features can also be supported by making timers part of the backends checkpoint data.


---

[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3359
  
    @shixiaogang The changes sound helpful, considering that this PR was implemented before incremental checkpoints. For a final opinion I would have to review the code. One concern that I have is a wild grow of more and more reported state types that we introduce. I am currently working on a feature that introduces recovery from task-local state, which also looks like it will add to the states that can be produced in a snapshot. So we might need some form of consolidation in the future to avoid a "wild-grow" of more and more state types that make everything more complex and harder to maintain. But overall, I think the general idea is very good!


---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106676752
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +			ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +			eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +			processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the column families.", e);
    +		}
    +
    +		this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +		this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  InternalTimerService Implementation
    +	// ------------------------------------------------------------------------
    +	
    +	@Override
    +	public void start() {
    +		// rebuild the heaps
    +		eventTimeHeap.initialize();
    +		processingTimeHeap.initialize();
    +		
    +		// register the processing timer with the minimum timestamp
    +		Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top();
    +		if (headProcessingTimer != null) {
    +			nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (db != null) {
    +			db.close();
    +		}
    +		
    +		if (dbPath != null) {
    +			try {
    +				FileSystem fileSystem = dbPath.getFileSystem();
    +				if (fileSystem.exists(dbPath)) {
    +					fileSystem.delete(dbPath, true);
    +				}
    +			} catch (IOException e) {
    +				throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void onEventTime(long timestamp) throws Exception {
    +		List<Tuple4<Integer, Long, K, N>> timers = eventTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onEventTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long timestamp) throws Exception {
    +		nextTimer = null;
    +
    +		List<Tuple4<Integer, Long, K, N>> timers = processingTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onProcessingTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +
    +		if (nextTimer == null) {
    +			Tuple4<Integer, Long, K, N> headTimer = processingTimeHeap.top();
    +			if (headTimer != null) {
    +				nextTimer = processingTimeService.registerTimer(headTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		boolean isNewHead = processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isNewHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer == null || newHeadTimer.f1 != time) {
    +				throw new IllegalStateException();
    +			}
    +
    +			nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		boolean isCurrentHead = processingTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isCurrentHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer != null) {
    +				if (newHeadTimer.f1 < time) {
    +					throw new IllegalStateException();
    +				}
    +
    +				nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup) {
    +		return eventTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup) {
    +		return processingTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		eventTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		processingTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers() {
    +		return processingTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers() {
    +		return eventTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers(N namespace) {
    +		return processingTimeHeap.numTimers(namespace);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers(N namespace) {
    +		return eventTimeHeap.numTimers(namespace);
    +	}
    +	
    +	// ------------------------------------------------------------------------
    +	//  Partitioning Methods
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Assigns the given key group to a partition.
    +	 */
    +	private static int getPartitionForKeyGroup(KeyGroupRange keyGroupRange, int keyGroup, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must not be smaller than zero.");
    +
    +		Preconditions.checkArgument(keyGroup >= keyGroupRange.getStartKeyGroup() && keyGroup <= keyGroupRange.getEndKeyGroup(), "Key group must be in the range");
    +
    +		long numKeyGroupsPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeyGroupsPerPartition * numPartitions;
    +
    +		keyGroup -= keyGroupRange.getStartKeyGroup();
    +
    +		if (keyGroup >= (numKeyGroupsPerPartition + 1L) * numFatPartitions) {
    +			return (int)((keyGroup - (numKeyGroupsPerPartition + 1L) * numFatPartitions) / numKeyGroupsPerPartition + numFatPartitions);
    +		} else {
    +			return (int)(keyGroup / (numKeyGroupsPerPartition + 1L));
    +		}
    +	}
    +
    +	/**
    +	 * Compute the range of the given partition
    +	 */
    +	private static KeyGroupRange getRangeForPartition(KeyGroupRange keyGroupRange, int partitionIndex, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(partitionIndex >= 0, "Partition index must be not smaller than zero.");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must be greater than zero.");
    +
    +		long numKeysPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeysPerPartition * numPartitions;
    +
    +		if (partitionIndex >= numFatPartitions) {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(numFatPartitions * (numKeysPerPartition + 1L) + (partitionIndex - numFatPartitions) * numKeysPerPartition);
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition - 1L);
    +
    +			return (startKeyGroup > endKeyGroup ? null : new KeyGroupRange(startKeyGroup, endKeyGroup));
    +		} else {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(partitionIndex * (numKeysPerPartition + 1L));
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition);
    +
    +			return new KeyGroupRange(startKeyGroup, endKeyGroup);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Serialization Methods
    +	// ------------------------------------------------------------------------
    +
    +	private byte[] serializeKeyGroup(int keyGroup) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(keyGroup, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the key group.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private byte[] serializeRawTimer(Tuple4<Integer, Long, K, N> rawTimer) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(rawTimer.f0, outputView);
    +			LongSerializer.INSTANCE.serialize(rawTimer.f1, outputView);
    +			keySerializer.serialize(rawTimer.f2, outputView);
    +			namespaceSerializer.serialize(rawTimer.f3, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while serializing the raw timer.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private Tuple4<Integer, Long, K, N> deserializeRawTimer(byte[] bytes) {
    +		ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
    +		DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
    +
    +		try {
    +			int keyGroup = IntSerializer.INSTANCE.deserialize(inputView);
    +			long timestamp = LongSerializer.INSTANCE.deserialize(inputView);
    +			K key = keySerializer.deserialize(inputView);
    +			N namespace = namespaceSerializer.deserialize(inputView);
    +
    +			return new Tuple4<>(keyGroup, timestamp, key, namespace);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the raw timer.", e);
    +		}
    +	}
    +
    +	/**
    +	 * A timer store backed by RocksDB. 
    +	 * 
    +	 * The timers are stored in RocksDB in the order of key groups. To allow 
    +	 * efficient access, the timers are partitioned and the leader of each 
    +	 * partition is stored in an in-memory heap. The top of the heap is 
    +	 * exactly the first timer to trigger. The heap is updated whenever the 
    +	 * partition's leader is updated.
    +	 */
    +	private class PersistentTimerHeap {
    +
    +		private final ColumnFamilyHandle handle;
    +
    +		/** Leader timers in the partitions */
    +		private final Tuple4<Integer, Long, K, N>[] partitionHeadTimers;
    +		
    +		/** The order of the partition's leader in the heap */
    +		private final int[] orders;
    +		
    +		/** The partitions in the order of their leaders in the heap */
    +		private final int[] indices;
    +		
    +		@SuppressWarnings("unchecked")
    +		PersistentTimerHeap(int capacity, ColumnFamilyHandle handle) {
    +			this.handle = handle;
    +
    +			this.partitionHeadTimers = (Tuple4<Integer, Long, K, N>[])new Tuple4[capacity];
    +			this.indices = new int[capacity];
    +			this.orders = new int[capacity];
    +			
    +			for (int i = 0; i < capacity; i++) {
    +				orders[i] = i;
    +				indices[i] = i;
    +			}
    +		}
    +
    +		private int getParentOrder(int order) {
    +			return (order - 1) / 2;
    +		}
    +
    +		private int getLeftChildOrder(int order) {
    +			return order * 2 + 1;
    +		}
    +
    +		private int getRightChildOrder(int order) {
    +			return order * 2 + 2;
    +		}
    +
    +		private long getTimestamp(Tuple4<Integer, Long, K, N> rawTimer) {
    +			return rawTimer == null ? Long.MAX_VALUE : rawTimer.f1;
    +		}
    +
    +		/** Rebuild the heap with the given DB. */
    +		void initialize() {
    +			int currentPartition = -1;
    +			Tuple4<Integer, Long, K, N> currentPartitionLeaderTimer = null;
    +
    +			RocksIterator iterator = db.newIterator(handle);
    --- End diff --
    
    I suggest using try-with-resource with the `RocksIterator` to ensure that the native resources are closed after usage. This can also be changed in other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106674528
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    --- End diff --
    
    Better pass a charsets to getBytes(...) calls. I think we have manual tests that guard from usage without and could fail on this line. This is used and should be changed also in other places in this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106677330
  
    --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> {
    +	
    +	private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +	
    +	/** The data base where stores all timers */
    +	private final RocksDB db;
    +	
    +	/** The path where the rocksdb locates */
    +	private final Path dbPath;
    +
    +	/**
    +	 * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each
    +	 * partition's leader is stored in the heap. When the timers in a partition is changed, we
    +	 * will change the partition's leader and update the heap accordingly.
    +	 */
    +	private final int numPartitions;
    +	private final PersistentTimerHeap eventTimeHeap;
    +	private final PersistentTimerHeap processingTimeHeap;
    +	
    +	private static int MAX_PARTITIONS = (1 << 16);
    +
    +	public RocksDBInternalTimerService(
    +			int totalKeyGroups,
    +			KeyGroupRange keyGroupRange,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			Path dbPath) {
    +
    +		super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService);
    +		
    +		this.dbPath = dbPath;
    +		
    +		try {
    +			FileSystem fileSystem = this.dbPath.getFileSystem();
    +			if (fileSystem.exists(this.dbPath)) {
    +				fileSystem.delete(this.dbPath, true);
    +			}
    +			
    +			fileSystem.mkdirs(dbPath);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while creating directory for rocksdb timer service.", e);
    +		}
    +
    +		ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()
    +				.setMergeOperator(new StringAppendOperator())
    +				.setCompactionStyle(CompactionStyle.UNIVERSAL);
    +		ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +		DBOptions dbOptions = new DBOptions()
    +				.setCreateIfMissing(true)
    +				.setUseFsync(false)
    +				.setDisableDataSync(true)
    +				.setMaxOpenFiles(-1);
    +
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +
    +		try {
    +			this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the RocksDB instance.", e);
    +		}
    +
    +		this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +		ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +		ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +		try {
    +			ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +			ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +			eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +			processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +		} catch (RocksDBException e) {
    +			throw new RuntimeException("Error while creating the column families.", e);
    +		}
    +
    +		this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +		this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  InternalTimerService Implementation
    +	// ------------------------------------------------------------------------
    +	
    +	@Override
    +	public void start() {
    +		// rebuild the heaps
    +		eventTimeHeap.initialize();
    +		processingTimeHeap.initialize();
    +		
    +		// register the processing timer with the minimum timestamp
    +		Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top();
    +		if (headProcessingTimer != null) {
    +			nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (db != null) {
    +			db.close();
    +		}
    +		
    +		if (dbPath != null) {
    +			try {
    +				FileSystem fileSystem = dbPath.getFileSystem();
    +				if (fileSystem.exists(dbPath)) {
    +					fileSystem.delete(dbPath, true);
    +				}
    +			} catch (IOException e) {
    +				throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void onEventTime(long timestamp) throws Exception {
    +		List<Tuple4<Integer, Long, K, N>> timers = eventTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onEventTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long timestamp) throws Exception {
    +		nextTimer = null;
    +
    +		List<Tuple4<Integer, Long, K, N>> timers = processingTimeHeap.peek(timestamp);
    +		for (Tuple4<Integer, Long, K, N> timer : timers) {
    +			keyContext.setCurrentKey(timer.f2);
    +			triggerTarget.onProcessingTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +		}
    +
    +		if (nextTimer == null) {
    +			Tuple4<Integer, Long, K, N> headTimer = processingTimeHeap.top();
    +			if (headTimer != null) {
    +				nextTimer = processingTimeService.registerTimer(headTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		boolean isNewHead = processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isNewHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer == null || newHeadTimer.f1 != time) {
    +				throw new IllegalStateException();
    +			}
    +
    +			nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		boolean isCurrentHead = processingTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +
    +		if (isCurrentHead) {
    +			if (nextTimer != null) {
    +				nextTimer.cancel(false);
    +			}
    +
    +			Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top();
    +			if (newHeadTimer != null) {
    +				if (newHeadTimer.f1 < time) {
    +					throw new IllegalStateException();
    +				}
    +
    +				nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this);
    +			}
    +		}
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		eventTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup) {
    +		return eventTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup) {
    +		return processingTimeHeap.getTimers(keyGroup);
    +	}
    +
    +	@Override
    +	public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		eventTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) {
    +		processingTimeHeap.restoreTimers(keyGroup, internalTimers);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers() {
    +		return processingTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers() {
    +		return eventTimeHeap.numTimers(null);
    +	}
    +
    +	@Override
    +	public int numProcessingTimeTimers(N namespace) {
    +		return processingTimeHeap.numTimers(namespace);
    +	}
    +
    +	@Override
    +	public int numEventTimeTimers(N namespace) {
    +		return eventTimeHeap.numTimers(namespace);
    +	}
    +	
    +	// ------------------------------------------------------------------------
    +	//  Partitioning Methods
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Assigns the given key group to a partition.
    +	 */
    +	private static int getPartitionForKeyGroup(KeyGroupRange keyGroupRange, int keyGroup, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must not be smaller than zero.");
    +
    +		Preconditions.checkArgument(keyGroup >= keyGroupRange.getStartKeyGroup() && keyGroup <= keyGroupRange.getEndKeyGroup(), "Key group must be in the range");
    +
    +		long numKeyGroupsPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeyGroupsPerPartition * numPartitions;
    +
    +		keyGroup -= keyGroupRange.getStartKeyGroup();
    +
    +		if (keyGroup >= (numKeyGroupsPerPartition + 1L) * numFatPartitions) {
    +			return (int)((keyGroup - (numKeyGroupsPerPartition + 1L) * numFatPartitions) / numKeyGroupsPerPartition + numFatPartitions);
    +		} else {
    +			return (int)(keyGroup / (numKeyGroupsPerPartition + 1L));
    +		}
    +	}
    +
    +	/**
    +	 * Compute the range of the given partition
    +	 */
    +	private static KeyGroupRange getRangeForPartition(KeyGroupRange keyGroupRange, int partitionIndex, int numPartitions) {
    +		Preconditions.checkArgument(keyGroupRange != null, "The range must not be null");
    +		Preconditions.checkArgument(partitionIndex >= 0, "Partition index must be not smaller than zero.");
    +		Preconditions.checkArgument(numPartitions > 0, "Partition count must be greater than zero.");
    +
    +		long numKeysPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +		long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeysPerPartition * numPartitions;
    +
    +		if (partitionIndex >= numFatPartitions) {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(numFatPartitions * (numKeysPerPartition + 1L) + (partitionIndex - numFatPartitions) * numKeysPerPartition);
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition - 1L);
    +
    +			return (startKeyGroup > endKeyGroup ? null : new KeyGroupRange(startKeyGroup, endKeyGroup));
    +		} else {
    +			int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(partitionIndex * (numKeysPerPartition + 1L));
    +			int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition);
    +
    +			return new KeyGroupRange(startKeyGroup, endKeyGroup);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Serialization Methods
    +	// ------------------------------------------------------------------------
    +
    +	private byte[] serializeKeyGroup(int keyGroup) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(keyGroup, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the key group.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private byte[] serializeRawTimer(Tuple4<Integer, Long, K, N> rawTimer) {
    +		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    +		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
    +
    +		try {
    +			IntSerializer.INSTANCE.serialize(rawTimer.f0, outputView);
    +			LongSerializer.INSTANCE.serialize(rawTimer.f1, outputView);
    +			keySerializer.serialize(rawTimer.f2, outputView);
    +			namespaceSerializer.serialize(rawTimer.f3, outputView);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while serializing the raw timer.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	private Tuple4<Integer, Long, K, N> deserializeRawTimer(byte[] bytes) {
    +		ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
    +		DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
    +
    +		try {
    +			int keyGroup = IntSerializer.INSTANCE.deserialize(inputView);
    +			long timestamp = LongSerializer.INSTANCE.deserialize(inputView);
    +			K key = keySerializer.deserialize(inputView);
    +			N namespace = namespaceSerializer.deserialize(inputView);
    +
    +			return new Tuple4<>(keyGroup, timestamp, key, namespace);
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while deserializing the raw timer.", e);
    +		}
    +	}
    +
    +	/**
    +	 * A timer store backed by RocksDB. 
    +	 * 
    +	 * The timers are stored in RocksDB in the order of key groups. To allow 
    +	 * efficient access, the timers are partitioned and the leader of each 
    +	 * partition is stored in an in-memory heap. The top of the heap is 
    +	 * exactly the first timer to trigger. The heap is updated whenever the 
    +	 * partition's leader is updated.
    +	 */
    +	private class PersistentTimerHeap {
    +
    +		private final ColumnFamilyHandle handle;
    +
    +		/** Leader timers in the partitions */
    +		private final Tuple4<Integer, Long, K, N>[] partitionHeadTimers;
    +		
    +		/** The order of the partition's leader in the heap */
    +		private final int[] orders;
    +		
    +		/** The partitions in the order of their leaders in the heap */
    +		private final int[] indices;
    +		
    +		@SuppressWarnings("unchecked")
    +		PersistentTimerHeap(int capacity, ColumnFamilyHandle handle) {
    +			this.handle = handle;
    +
    +			this.partitionHeadTimers = (Tuple4<Integer, Long, K, N>[])new Tuple4[capacity];
    +			this.indices = new int[capacity];
    +			this.orders = new int[capacity];
    +			
    +			for (int i = 0; i < capacity; i++) {
    +				orders[i] = i;
    +				indices[i] = i;
    +			}
    +		}
    +
    +		private int getParentOrder(int order) {
    +			return (order - 1) / 2;
    +		}
    +
    +		private int getLeftChildOrder(int order) {
    +			return order * 2 + 1;
    +		}
    +
    +		private int getRightChildOrder(int order) {
    +			return order * 2 + 2;
    +		}
    +
    +		private long getTimestamp(Tuple4<Integer, Long, K, N> rawTimer) {
    +			return rawTimer == null ? Long.MAX_VALUE : rawTimer.f1;
    +		}
    +
    +		/** Rebuild the heap with the given DB. */
    +		void initialize() {
    +			int currentPartition = -1;
    +			Tuple4<Integer, Long, K, N> currentPartitionLeaderTimer = null;
    +
    +			RocksIterator iterator = db.newIterator(handle);
    +			iterator.seekToFirst();
    +
    +			while (iterator.isValid()) {
    +				Tuple4<Integer, Long, K, N> rawTimer = deserializeRawTimer(iterator.key());
    +
    +				int partition = getPartitionForKeyGroup(keyGroupRange, rawTimer.f0, numPartitions);
    +				if (partition == currentPartition) {
    +					if (currentPartitionLeaderTimer == null || rawTimer.f1 < currentPartitionLeaderTimer.f1) {
    +						currentPartitionLeaderTimer = rawTimer;
    +					}
    +				} else {
    +					if (currentPartitionLeaderTimer != null) {
    +						updatePartitionLeader(currentPartition, currentPartitionLeaderTimer);
    +					}
    +
    +					currentPartition = partition;
    +					currentPartitionLeaderTimer = rawTimer;
    +				}
    +
    +				int nextKeyGroup = rawTimer.f0 + 1;
    +				byte[] nextKeyGroupBytes = serializeKeyGroup(nextKeyGroup);
    +				iterator.seek(nextKeyGroupBytes);
    +			}
    +
    +			iterator.close();
    +
    +			if (currentPartitionLeaderTimer != null) {
    +				updatePartitionLeader(currentPartition, currentPartitionLeaderTimer);
    +			}
    +		}
    +
    +		/**
    +		 * The first timer in the store.
    +		 */
    +		Tuple4<Integer, Long, K, N> top() {
    +			return partitionHeadTimers[indices[0]];
    +		}
    +
    +		/**
    +		 * Add a new timer into the store.
    +		 */
    +		boolean add(K key, N namespace, long timestamp) {
    +			int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, totalKeyGroups);
    +			int partition = getPartitionForKeyGroup(keyGroupRange, keyGroup, numPartitions);
    +
    +			Tuple4<Integer, Long, K, N> timer = new Tuple4<>(keyGroup, timestamp, key, namespace);
    +			insertDB(timer);
    +
    +			Tuple4<Integer, Long, K, N> headTimer = partitionHeadTimers[indices[0]];
    +			boolean isNewHead = (headTimer == null || timer.f1 < headTimer.f1);
    +
    +			// Update the heap if the new timer is the new leader of the partition
    +			Tuple4<Integer, Long, K, N> partitionHeadTimer = partitionHeadTimers[partition];
    +			if (partitionHeadTimer == null || timer.f1 < partitionHeadTimer.f1) {
    +				updatePartitionLeader(partition, timer);
    +			}
    +
    +			return isNewHead;
    +		}
    +
    +		/**
    +		 * Remove the given timer from the store.
    +		 */
    +		boolean remove(K key, N namespace, long timestamp) {
    +			int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, totalKeyGroups);
    +			int partition = getPartitionForKeyGroup(keyGroupRange, keyGroup, numPartitions);
    +
    +			Tuple4<Integer, Long, K, N> timer = new Tuple4<>(keyGroup, timestamp, key, namespace);
    +			removeDB(timer);
    +
    +			Tuple4<Integer, Long, K, N> headTimer = partitionHeadTimers[indices[0]];
    +			boolean isCurrentHead = (headTimer != null && headTimer.equals(timer));
    +
    +			// Elect a new leader and update the heap if the deleted timer is the current leader of the partition 
    +			Tuple4<Integer, Long, K, N> partitionHeadTimer = partitionHeadTimers[partition];
    +			if (timer.equals(partitionHeadTimer)) {
    +				Tuple4<Integer, Long, K, N> newPartitionHeadTimer = electPartitionLeader(partition);
    +				updatePartitionLeader(partition, newPartitionHeadTimer);
    +			}
    +
    +			return isCurrentHead;
    +		}
    +
    +		/**
    +		 * Remove and return all timers that are later than the given time.
    +		 */
    +		List<Tuple4<Integer, Long, K, N>> peek(long timestamp) {
    +			List<Tuple4<Integer, Long, K, N>> expiredTimers = new ArrayList<>();
    +
    +			while (true) {
    +				int partition = indices[0];
    +
    +				Tuple4<Integer, Long, K, N> partitionHeadTimer = partitionHeadTimers[partition];
    +				if (partitionHeadTimer == null || partitionHeadTimer.f1 > timestamp) {
    +					break;
    +				}
    +
    +				Tuple4<Integer, Long, K, N> newPartitionLeader = electPartitionLeader(partition, timestamp, expiredTimers);
    +				updatePartitionLeader(partition, newPartitionLeader);
    +			}
    +
    +			return expiredTimers;
    +		}
    +
    +		/**
    +		 * Return all the timers in the given key group.
    +		 */
    +		Set<InternalTimer<K,N>> getTimers(int keyGroup) {
    +			Set<InternalTimer<K, N>> timers = new HashSet<>();
    +
    +			RocksIterator iterator = db.newIterator(handle);
    +
    +			byte[] keyGroupBytes = serializeKeyGroup(keyGroup);
    +			iterator.seek(keyGroupBytes);
    +
    +			while (iterator.isValid()) {
    +				Tuple4<Integer, Long, K, N> rawTimer = deserializeRawTimer(iterator.key());
    +
    +				if (rawTimer.f0 != keyGroup) {
    +					break;
    +				}
    +
    +				timers.add(new InternalTimer<>(rawTimer.f1, rawTimer.f2, rawTimer.f3));
    +
    +				iterator.next();
    +			}
    +
    +			iterator.close();
    +
    +			return timers;
    +		}
    +
    +		/**
    +		 * Restore the key group with the given timers
    +		 */
    +		void restoreTimers(int keyGroup, Iterable<InternalTimer<K, N>> timers) {
    +			for (InternalTimer<K, N> timer : timers) {
    +				Tuple4<Integer, Long, K, N> rawTimer = new Tuple4<>(keyGroup, timer.getTimestamp(), timer.getKey(), timer.getNamespace());
    +
    +				insertDB(rawTimer);
    +			}
    +		}
    +		
    +		/** 
    +		 * Return the number of timers in the given namespace. In the cases 
    +		 * where the given namespace is null, all timers will be counted.
    +		 */
    +		int numTimers(N namespace) {
    +			int count = 0;
    +
    +			RocksIterator iterator = db.newIterator(handle);
    +			iterator.seekToFirst();
    +
    +			while(iterator.isValid()) {
    +
    +				boolean matched = true;
    +				
    +				if (namespace != null) {
    +					Tuple4<Integer, Long, K, N> rawTimer = deserializeRawTimer(iterator.key());
    +
    +					if (rawTimer.f3 != null && !rawTimer.f3.equals(namespace)) {
    +						matched = false;
    +					}
    +				}
    +
    +				if (matched) {
    +					count++;
    +				}
    +
    +				iterator.next();
    +			}
    +
    +			return count;
    +		}
    +
    +		private void removeDB(Tuple4<Integer, Long, K, N> rawTimer) {
    +			try {
    +				byte[] bytes = serializeRawTimer(rawTimer);
    +				db.remove(handle, bytes);
    +			} catch (RocksDBException e) {
    +				throw new RuntimeException("Error while removing timer from RocksDB.", e);
    +			}
    +		}
    +
    +		private void insertDB(Tuple4<Integer, Long, K, N> rawTimer) {
    +			try {
    +				byte[] bytes = serializeRawTimer(rawTimer);
    +				db.put(handle, bytes, "dummy".getBytes());
    +			} catch (RocksDBException e) {
    +				throw new RuntimeException("Error while getting timer from RocksDB.", e);
    +			}
    +		}
    +
    +		/**
    +		 * Scan the partition to find the leader.
    +		 * 
    +		 * @param partition The partition to update
    +		 * @return The new leader of the partition
    +		 */
    +		private Tuple4<Integer, Long, K, N> electPartitionLeader(int partition) {
    +			return electPartitionLeader(partition, Long.MIN_VALUE, new ArrayList<Tuple4<Integer, Long, K, N>>());
    +		}
    +
    +		/**
    +		 * Scan the partition to find a new leader whose timestamp is larger 
    +		 * than the given timestamp. All timers who are earlier than the given
    +		 * timestamp will be removed from the DB.
    +		 * 
    +		 * @param partition The partition to update
    +		 * @param timestamp The expiration timestamp
    +		 * @param expiredTimers The list to store expired timers
    +		 * @return The new leader of the partition
    +		 */
    +		private Tuple4<Integer, Long, K, N> electPartitionLeader(int partition, long timestamp, List<Tuple4<Integer, Long, K, N>> expiredTimers) {
    +			Tuple4<Integer, Long, K, N> partitionHeadTimer = null;
    +			List<Tuple4<Integer, Long, K, N>> partitionExpiredTimers = new ArrayList<>();
    +
    +			RocksIterator iterator = db.newIterator(handle);
    +
    +			KeyGroupRange partitionRange = getRangeForPartition(keyGroupRange, partition, numPartitions);
    +			if (partitionRange == null) {
    +				throw new IllegalStateException();
    +			}
    +
    +			// Start the scanning from the first key group in the partition
    +			int currentKeyGroup = partitionRange.getStartKeyGroup();
    +			byte[] currentKeyGroupBytes = serializeKeyGroup(currentKeyGroup);
    +			iterator.seek(currentKeyGroupBytes);
    +
    +			while (true) {
    --- End diff --
    
    Replace by `while (iterator.isValid())` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106670490
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java ---
    @@ -18,43 +18,306 @@
     package org.apache.flink.streaming.api.operators;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Interface for working with time and timers.
      *
      * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
      * that allows to specify a key and a namespace to which timers should be scoped.
      *
    + * All d
    + * 
    + * @param <K> Type of the keys in the stream
      * @param <N> Type of the namespace to which timers are scoped.
      */
     @Internal
    -public interface InternalTimerService<N> {
    +public abstract class InternalTimerService<K, N> implements ProcessingTimeCallback, EventTimeCallback {
    +
    +	protected final ProcessingTimeService processingTimeService;
    +
    +	protected final KeyContext keyContext;
    +
    +	protected final int totalKeyGroups;
    +
    +	protected final KeyGroupRange keyGroupRange;
    +
    +	/**
    +	 * The one and only Future (if any) registered to execute the
    +	 * next {@link Triggerable} action, when its (processing) time arrives.
    +	 */
    +	protected ScheduledFuture<?> nextTimer;
    +
    +	/**
    +	 * The local event time, as denoted by the last received
    +	 * {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
    +	 */
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	// Variables to be set when the service is started.
    +
    +	protected TypeSerializer<K> keySerializer;
    +
    +	protected TypeSerializer<N> namespaceSerializer;
    +
    +	private InternalTimer.TimerSerializer<K, N> timerSerializer;
    +
    +	protected Triggerable<K, N> triggerTarget;
    +
    +	private volatile boolean isInitialized;
    +
    +	public InternalTimerService(
    +			int totalKeyGroups, 
    +			KeyGroupRange keyGroupRange, 
    +			KeyContext keyContext, 
    +			ProcessingTimeService processingTimeService) {
    +		
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.keyGroupRange = checkNotNull(keyGroupRange);
    +		this.keyContext = checkNotNull(keyContext);
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +	}
     
     	/** Returns the current processing time. */
    -	long currentProcessingTime();
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
     
     	/** Returns the current event-time watermark. */
    -	long currentWatermark();
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerProcessingTimeTimer(N namespace, long time);
    +	abstract public void registerProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteProcessingTimeTimer(N namespace, long time);
    +	abstract public void deleteProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerEventTimeTimer(N namespace, long time);
    +	abstract public void registerEventTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteEventTimeTimer(N namespace, long time);
    +	abstract public void deleteEventTimeTimer(N namespace, long time);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +	/**
    +	 * Starts the execution of the timer service
    +	 */
    +	abstract public void start();
    +
    +	/**
    +	 * Closes the timer service.
    +	 */
    +	abstract public void close();
    +	
    +	public void advanceWatermark(long watermark) throws Exception {
    +		if (watermark < currentWatermark) {
    +			throw new IllegalStateException("The watermark is late.");
    +		}
    +		
    +		currentWatermark = watermark;
    +		
    +		onEventTime(watermark);
    +	}
    +
    +	/**
    +	 * Snapshots the timers (both processing and event time ones) for a given {@code keyGroupIdx}.
    +	 * @param stream the stream to write to.
    +	 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
    +	 */
    +	public void snapshotTimersForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception {
    --- End diff --
    
    I think this does not require public visibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---