You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:24 UTC
[19/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java
deleted file mode 100644
index bb98e07..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.lock;
-
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Factory to create zookeeper based locks.
- */
-public class ZKSessionLockFactory implements SessionLockFactory {
-
- private final ZooKeeperClient zkc;
- private final String clientId;
- private final OrderedScheduler lockStateExecutor;
- private final long lockOpTimeout;
- private final int lockCreationRetries;
- private final long zkRetryBackoffMs;
-
- // Stats
- private final StatsLogger lockStatsLogger;
-
- public ZKSessionLockFactory(ZooKeeperClient zkc,
- String clientId,
- OrderedScheduler lockStateExecutor,
- int lockCreationRetries,
- long lockOpTimeout,
- long zkRetryBackoffMs,
- StatsLogger statsLogger) {
- this.zkc = zkc;
- this.clientId = clientId;
- this.lockStateExecutor = lockStateExecutor;
- this.lockCreationRetries = lockCreationRetries;
- this.lockOpTimeout = lockOpTimeout;
- this.zkRetryBackoffMs = zkRetryBackoffMs;
-
- this.lockStatsLogger = statsLogger.scope("lock");
- }
-
- @Override
- public Future<SessionLock> createLock(String lockPath,
- DistributedLockContext context) {
- AtomicInteger numRetries = new AtomicInteger(lockCreationRetries);
- final AtomicReference<Throwable> interruptedException = new AtomicReference<Throwable>(null);
- Promise<SessionLock> createPromise =
- new Promise<SessionLock>(new com.twitter.util.Function<Throwable, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Throwable t) {
- interruptedException.set(t);
- return BoxedUnit.UNIT;
- }
- });
- createLock(
- lockPath,
- context,
- interruptedException,
- numRetries,
- createPromise,
- 0L);
- return createPromise;
- }
-
- void createLock(final String lockPath,
- final DistributedLockContext context,
- final AtomicReference<Throwable> interruptedException,
- final AtomicInteger numRetries,
- final Promise<SessionLock> createPromise,
- final long delayMs) {
- lockStateExecutor.schedule(lockPath, new Runnable() {
- @Override
- public void run() {
- if (null != interruptedException.get()) {
- createPromise.updateIfEmpty(new Throw<SessionLock>(interruptedException.get()));
- return;
- }
- try {
- SessionLock lock = new ZKSessionLock(
- zkc,
- lockPath,
- clientId,
- lockStateExecutor,
- lockOpTimeout,
- lockStatsLogger,
- context);
- createPromise.updateIfEmpty(new Return<SessionLock>(lock));
- } catch (DLInterruptedException dlie) {
- // if the creation is interrupted, throw the exception without retrie.
- createPromise.updateIfEmpty(new Throw<SessionLock>(dlie));
- return;
- } catch (IOException e) {
- if (numRetries.getAndDecrement() < 0) {
- createPromise.updateIfEmpty(new Throw<SessionLock>(e));
- return;
- }
- createLock(
- lockPath,
- context,
- interruptedException,
- numRetries,
- createPromise,
- zkRetryBackoffMs);
- }
- }
- }, delayMs, TimeUnit.MILLISECONDS);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java
deleted file mode 100644
index 02d905d..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Distributed locking mechanism in distributedlog
- */
-package com.twitter.distributedlog.lock;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
deleted file mode 100644
index 81eb5ed..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.Beta;
-import com.twitter.distributedlog.Entry;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.util.List;
-
-/**
- * An interface class to read the enveloped entry (serialized bytes of
- * {@link com.twitter.distributedlog.Entry}) from a log segment
- */
-@Beta
-public interface LogSegmentEntryReader extends AsyncCloseable {
-
- interface StateChangeListener {
-
- /**
- * Notify when caught up on inprogress.
- */
- void onCaughtupOnInprogress();
-
- }
-
- /**
- * Start the reader. The method to signal the implementation
- * to start preparing the data for consumption {@link #readNext(int)}
- */
- void start();
-
- /**
- * Register the state change listener
- *
- * @param listener register the state change listener
- * @return entry reader
- */
- LogSegmentEntryReader registerListener(StateChangeListener listener);
-
- /**
- * Unregister the state change listener
- *
- * @param listener register the state change listener
- * @return entry reader
- */
- LogSegmentEntryReader unregisterListener(StateChangeListener listener);
-
- /**
- * Return the log segment metadata for this reader.
- *
- * @return the log segment metadata
- */
- LogSegmentMetadata getSegment();
-
- /**
- * Update the log segment each time when the metadata has changed.
- *
- * @param segment new metadata of the log segment.
- */
- void onLogSegmentMetadataUpdated(LogSegmentMetadata segment);
-
- /**
- * Read next <i>numEntries</i> entries from current log segment.
- * <p>
- * <i>numEntries</i> will be best-effort.
- *
- * @param numEntries num entries to read from current log segment
- * @return A promise that when satisified will contain a non-empty list of entries with their content.
- * @throws {@link com.twitter.distributedlog.exceptions.EndOfLogSegmentException} when
- * read entries beyond the end of a <i>closed</i> log segment.
- */
- Future<List<Entry.Reader>> readNext(int numEntries);
-
- /**
- * Return the last add confirmed entry id (LAC).
- *
- * @return the last add confirmed entry id.
- */
- long getLastAddConfirmed();
-
- /**
- * Is the reader reading beyond last add confirmed.
- *
- * @return true if the reader is reading beyond last add confirmed
- */
- boolean isBeyondLastAddConfirmed();
-
- /**
- * Has the log segment reader caught up with the inprogress log segment.
- *
- * @return true only if the log segment is inprogress and it is caught up, otherwise return false.
- */
- boolean hasCaughtUpOnInprogress();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
deleted file mode 100644
index bcf8129..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.Beta;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.util.Allocator;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.util.Future;
-
-import java.io.IOException;
-
-/**
- * Log Segment Store to read log segments
- */
-@Beta
-public interface LogSegmentEntryStore {
-
- /**
- * Delete the actual log segment from the entry store.
- *
- * @param segment log segment metadata
- * @return future represent the delete result
- */
- Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment);
-
- /**
- * Create a new log segment allocator for allocating log segment entry writers.
- *
- * @param metadata the metadata for the log stream
- * @return future represent the log segment allocator
- */
- Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
- LogMetadataForWriter metadata,
- DynamicDistributedLogConfiguration dynConf) throws IOException;
-
- /**
- * Open the reader for reading data to the log <i>segment</i>.
- *
- * @param segment the log <i>segment</i> to read data from
- * @param startEntryId the start entry id
- * @return future represent the opened reader
- */
- Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
- long startEntryId);
-
- /**
- * Open the reader for reading entries from a random access log <i>segment</i>.
- *
- * @param segment the log <i>segment</i> to read entries from
- * @param fence the flag to fence log segment
- * @return future represent the opened random access reader
- */
- Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(LogSegmentMetadata segment,
- boolean fence);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java
deleted file mode 100644
index 8b7d9b2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.Beta;
-import com.twitter.distributedlog.Entry;
-import com.twitter.distributedlog.util.Sizable;
-import org.apache.bookkeeper.client.AsyncCallback;
-
-/**
- * An interface class to write the enveloped entry (serialized bytes of
- * {@link Entry} into the log segment.
- *
- * <p>It is typically used by {@link LogSegmentWriter}.
- *
- * @see LogSegmentWriter
- *
- * TODO: The interface is leveraging bookkeeper's callback and status code now
- * Consider making it more generic.
- */
-@Beta
-public interface LogSegmentEntryWriter extends Sizable {
-
- /**
- * Get the log segment id.
- *
- * @return log segment id.
- */
- long getLogSegmentId();
-
- /**
- * Close the entry writer.
- */
- void asyncClose(AsyncCallback.CloseCallback callback, Object ctx);
-
- /**
- * Async add entry to the log segment.
- * <p>The implementation semantic follows
- * {@link org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry(
- * byte[], int, int, AsyncCallback.AddCallback, Object)}
- *
- * @param data
- * data to add
- * @param offset
- * offset in the data
- * @param length
- * length of the data
- * @param callback
- * callback
- * @param ctx
- * ctx
- * @see org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry(
- * byte[], int, int, AsyncCallback.AddCallback, Object)
- */
- void asyncAddEntry(byte[] data, int offset, int length,
- AsyncCallback.AddCallback callback, Object ctx);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java
deleted file mode 100644
index f8bf183..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import java.util.Collection;
-
-/**
- * Filter to filter log segments
- */
-public interface LogSegmentFilter {
-
- public static final LogSegmentFilter DEFAULT_FILTER = new LogSegmentFilter() {
- @Override
- public Collection<String> filter(Collection<String> fullList) {
- return fullList;
- }
- };
-
- /**
- * Filter the log segments from the full log segment list.
- *
- * @param fullList
- * full list of log segment names.
- * @return filtered log segment names
- */
- Collection<String> filter(Collection<String> fullList);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java
deleted file mode 100644
index d4ca3ea..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.base.Ticker;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Cache the log segment metadata
- */
-public class LogSegmentMetadataCache implements RemovalListener<String, LogSegmentMetadata> {
-
- private static final Logger logger = LoggerFactory.getLogger(LogSegmentMetadataCache.class);
-
- private final Cache<String, LogSegmentMetadata> cache;
- private final boolean isCacheEnabled;
-
- public LogSegmentMetadataCache(DistributedLogConfiguration conf,
- Ticker ticker) {
- cache = CacheBuilder.newBuilder()
- .concurrencyLevel(conf.getNumWorkerThreads())
- .initialCapacity(1024)
- .expireAfterAccess(conf.getLogSegmentCacheTTLMs(), TimeUnit.MILLISECONDS)
- .maximumSize(conf.getLogSegmentCacheMaxSize())
- .removalListener(this)
- .ticker(ticker)
- .recordStats()
- .build();
- this.isCacheEnabled = conf.isLogSegmentCacheEnabled();
- logger.info("Log segment cache is enabled = {}", this.isCacheEnabled);
- }
-
- /**
- * Add the log <i>segment</i> of <i>path</i> to the cache.
- *
- * @param path the path of the log segment
- * @param segment log segment metadata
- */
- public void put(String path, LogSegmentMetadata segment) {
- if (isCacheEnabled) {
- cache.put(path, segment);
- }
- }
-
- /**
- * Invalid the cache entry associated with <i>path</i>.
- *
- * @param path the path of the log segment
- */
- public void invalidate(String path) {
- if (isCacheEnabled) {
- cache.invalidate(path);
- }
- }
-
- /**
- * Retrieve the log segment of <i>path</i> from the cache.
- *
- * @param path the path of the log segment.
- * @return log segment metadata if exists, otherwise null.
- */
- public LogSegmentMetadata get(String path) {
- return cache.getIfPresent(path);
- }
-
- @Override
- public void onRemoval(RemovalNotification<String, LogSegmentMetadata> notification) {
- if (notification.wasEvicted()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Log segment of {} was evicted.", notification.getKey());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
deleted file mode 100644
index dda76e5..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.Beta;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.callback.LogSegmentNamesListener;
-import com.twitter.distributedlog.metadata.LogMetadata;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-
-import java.io.Closeable;
-import java.util.List;
-
-/**
- * Interface for log segment metadata store. All operations that modify log segments should
- * be executed under a {@link Transaction}.
- */
-@Beta
-public interface LogSegmentMetadataStore extends Closeable {
-
- /**
- * Start the transaction on changing log segment metadata store.
- *
- * @return transaction of the log segment metadata store.
- */
- Transaction<Object> transaction();
-
- // The reason to keep storing log segment sequence number & log record transaction id
- // in this log segment metadata store interface is to share the transaction that used
- // to start/complete log segment. It is a bit hard to separate them out right now.
-
- /**
- * Store the maximum log segment sequence number on <code>path</code>.
- *
- * @param txn
- * transaction to execute for storing log segment sequence number.
- * @param logMetadata
- * metadata of the log stream
- * @param sequenceNumber
- * log segment sequence number to store
- * @param listener
- * listener on the result to this operation
- */
- void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
- LogMetadata logMetadata,
- Versioned<Long> sequenceNumber,
- OpListener<Version> listener);
-
- /**
- * Store the maximum transaction id for <code>path</code>
- *
- * @param txn
- * transaction to execute for storing transaction id
- * @param logMetadata
- * metadata of the log stream
- * @param transactionId
- * transaction id to store
- * @param listener
- * listener on the result to this operation
- */
- void storeMaxTxnId(Transaction<Object> txn,
- LogMetadataForWriter logMetadata,
- Versioned<Long> transactionId,
- OpListener<Version> listener);
-
- /**
- * Create a log segment <code>segment</code> under transaction <code>txn</code>.
- *
- * NOTE: this operation shouldn't be a blocking call. and it shouldn't execute the operation
- * immediately. the operation should be executed via {@link Transaction#execute()}
- *
- * @param txn
- * transaction to execute for this operation
- * @param segment
- * segment to create
- * @param opListener
- * the listener on the operation result
- */
- void createLogSegment(Transaction<Object> txn,
- LogSegmentMetadata segment,
- OpListener<Void> opListener);
-
- /**
- * Delete a log segment <code>segment</code> under transaction <code>txn</code>.
- *
- * NOTE: this operation shouldn't be a blocking call. and it shouldn't execute the operation
- * immediately. the operation should be executed via {@link Transaction#execute()}
- *
- * @param txn
- * transaction to execute for this operation
- * @param segment
- * segment to delete
- */
- void deleteLogSegment(Transaction<Object> txn,
- LogSegmentMetadata segment,
- OpListener<Void> opListener);
-
- /**
- * Update a log segment <code>segment</code> under transaction <code>txn</code>.
- *
- * NOTE: this operation shouldn't be a blocking call. and it shouldn't execute the operation
- * immediately. the operation should be executed via {@link Transaction#execute()}
- *
- * @param txn
- * transaction to execute for this operation
- * @param segment
- * segment to update
- */
- void updateLogSegment(Transaction<Object> txn, LogSegmentMetadata segment);
-
- /**
- * Retrieve the log segment associated <code>path</code>.
- *
- * @param logSegmentPath
- * path to store log segment metadata
- * @return future of the retrieved log segment metadata
- */
- Future<LogSegmentMetadata> getLogSegment(String logSegmentPath);
-
- /**
- * Retrieve the list of log segments under <code>logSegmentsPath</code> and register a <i>listener</i>
- * for subsequent changes for the list of log segments.
- *
- * @param logSegmentsPath
- * path to store list of log segments
- * @param listener
- * log segment listener on log segment changes
- * @return future of the retrieved list of log segment names
- */
- Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
- LogSegmentNamesListener listener);
-
- /**
- * Unregister a log segment <code>listener</code> on log segment changes under <code>logSegmentsPath</code>.
- *
- * @param logSegmentsPath
- * log segments path
- * @param listener
- * log segment listener on log segment changes
- */
- void unregisterLogSegmentListener(String logSegmentsPath,
- LogSegmentNamesListener listener);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
deleted file mode 100644
index 70472ca..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.twitter.distributedlog.Entry;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.util.List;
-
-/**
- * An interface class to read entries {@link com.twitter.distributedlog.Entry}
- * from a random access log segment.
- */
-public interface LogSegmentRandomAccessEntryReader extends AsyncCloseable {
-
- /**
- * Read entries [startEntryId, endEntryId] from a random access log segment.
- *
- * @param startEntryId start entry id
- * @param endEntryId end entry id
- * @return A promise that when satisfied will contain a list of entries of [startEntryId, endEntryId].
- */
- Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId);
-
- /**
- * Return the last add confirmed entry id (LAC).
- *
- * @return the last add confirmed entry id.
- */
- long getLastAddConfirmed();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java
deleted file mode 100644
index a0b4610..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.Beta;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.io.AsyncAbortable;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.io.IOException;
-
-/**
- * An interface class to write log records into a log segment.
- */
-@Beta
-public interface LogSegmentWriter extends AsyncCloseable, AsyncAbortable {
-
- /**
- * Get the unique log segment id.
- *
- * @return log segment id.
- */
- public long getLogSegmentId();
-
- /**
- * Write a log record to a log segment.
- *
- * @param record single log record
- * @return a future representing write result. A {@link DLSN} is returned if write succeeds,
- * otherwise, exceptions are returned.
- * @throws com.twitter.distributedlog.exceptions.LogRecordTooLongException if log record is too long
- * @throws com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException on invalid enveloped entry
- * @throws LockingException if failed to acquire lock for the writer
- * @throws BKTransmitException if failed to transmit data to bk
- * @throws com.twitter.distributedlog.exceptions.WriteException if failed to write to bk
- */
- public Future<DLSN> asyncWrite(LogRecord record);
-
- /**
- * This isn't a simple synchronous version of {@code asyncWrite}. It has different semantic.
- * This method only writes data to the buffer and flushes buffer if needed.
- *
- * TODO: we should remove this method. when we rewrite synchronous writer based on asynchronous writer,
- * since this is the semantic needed to be provided in higher level but just calling write & flush.
- *
- * @param record single log record
- * @throws IOException when tried to flush the buffer.
- * @see LogSegmentWriter#asyncWrite(LogRecord)
- */
- public void write(LogRecord record) throws IOException;
-
- /**
- * Transmit the buffered data and wait for it being persisted and return the last acknowledged
- * transaction id.
- *
- * @return future representing the transmit result with last acknowledged transaction id.
- */
- public Future<Long> flush();
-
- /**
- * Commit the current acknowledged data. It is the consequent operation of {@link #flush()},
- * which makes all the acknowledged data visible to
- *
- * @return future representing the commit result.
- */
- public Future<Long> commit();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java
deleted file mode 100644
index 5f88c5a..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Managing log segments in local cache.
- *
- * <p>
- * Caching of log segment metadata assumes that the data contained in the ZNodes for individual
- * log segments is never updated after creation i.e we never call setData. A log segment
- * is finalized by creating a new ZNode and deleting the in progress node. This code will have
- * to change if we change the behavior
- * </p>
- */
-public class PerStreamLogSegmentCache {
-
- static final Logger LOG = LoggerFactory.getLogger(PerStreamLogSegmentCache.class);
-
- protected final String streamName;
- protected final boolean validateLogSegmentSequenceNumber;
- protected final Map<String, LogSegmentMetadata> logSegments =
- new HashMap<String, LogSegmentMetadata>();
- protected final ConcurrentMap<Long, LogSegmentMetadata> lid2LogSegments =
- new ConcurrentHashMap<Long, LogSegmentMetadata>();
-
- @VisibleForTesting
- PerStreamLogSegmentCache(String streamName) {
- this(streamName, true);
- }
-
- public PerStreamLogSegmentCache(String streamName,
- boolean validateLogSegmentSequenceNumber) {
- this.streamName = streamName;
- this.validateLogSegmentSequenceNumber = validateLogSegmentSequenceNumber;
- }
-
- /**
- * Retrieve log segments from the cache.
- *
- * - first sort the log segments in ascending order
- * - do validation and assign corresponding sequence id
- * - apply comparator after validation
- *
- * @param comparator
- * comparator to sort the returned log segments.
- * @return list of sorted and filtered log segments.
- * @throws UnexpectedException if unexpected condition detected (e.g. ledger sequence number gap)
- */
- public List<LogSegmentMetadata> getLogSegments(Comparator<LogSegmentMetadata> comparator)
- throws UnexpectedException {
- List<LogSegmentMetadata> segmentsToReturn;
- synchronized (logSegments) {
- segmentsToReturn = new ArrayList<LogSegmentMetadata>(logSegments.size());
- segmentsToReturn.addAll(logSegments.values());
- }
- Collections.sort(segmentsToReturn, LogSegmentMetadata.COMPARATOR);
-
- LogSegmentMetadata prevSegment = null;
- if (validateLogSegmentSequenceNumber) {
- // validation ledger sequence number to ensure the log segments are unique.
- for (int i = 0; i < segmentsToReturn.size(); i++) {
- LogSegmentMetadata segment = segmentsToReturn.get(i);
-
- if (null != prevSegment
- && prevSegment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value
- && segment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value
- && prevSegment.getLogSegmentSequenceNumber() + 1 != segment.getLogSegmentSequenceNumber()) {
- LOG.error("{} found ledger sequence number gap between log segment {} and {}",
- new Object[] { streamName, prevSegment, segment });
- throw new UnexpectedException(streamName + " found ledger sequence number gap between log segment "
- + prevSegment.getLogSegmentSequenceNumber() + " and " + segment.getLogSegmentSequenceNumber());
- }
- prevSegment = segment;
- }
- }
-
- prevSegment = null;
- long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
- for (int i = 0; i < segmentsToReturn.size(); i++) {
- LogSegmentMetadata segment = segmentsToReturn.get(i);
- // assign sequence id
- if (!segment.isInProgress()) {
- if (segment.supportsSequenceId()) {
- startSequenceId = segment.getStartSequenceId() + segment.getRecordCount();
- if (null != prevSegment && prevSegment.supportsSequenceId()
- && prevSegment.getStartSequenceId() > segment.getStartSequenceId()) {
- LOG.warn("{} found decreasing start sequence id in log segment {}, previous is {}",
- new Object[] { streamName, segment, prevSegment });
- }
- } else {
- startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
- }
- } else {
- if (segment.supportsSequenceId()) {
- LogSegmentMetadata newSegment = segment.mutator()
- .setStartSequenceId(startSequenceId == DistributedLogConstants.UNASSIGNED_SEQUENCE_ID ? 0L : startSequenceId)
- .build();
- segmentsToReturn.set(i, newSegment);
- }
-
- break;
- }
- prevSegment = segment;
- }
- if (comparator != LogSegmentMetadata.COMPARATOR) {
- Collections.sort(segmentsToReturn, comparator);
- }
- return segmentsToReturn;
- }
-
- /**
- * Add the segment <i>metadata</i> for <i>name</i> in the cache.
- *
- * @param name
- * segment name.
- * @param metadata
- * segment metadata.
- */
- public void add(String name, LogSegmentMetadata metadata) {
- synchronized (logSegments) {
- if (!logSegments.containsKey(name)) {
- logSegments.put(name, metadata);
- LOG.info("{} added log segment ({} : {}) to cache.",
- new Object[]{ streamName, name, metadata });
- }
- LogSegmentMetadata oldMetadata = lid2LogSegments.remove(metadata.getLogSegmentId());
- if (null == oldMetadata) {
- lid2LogSegments.put(metadata.getLogSegmentId(), metadata);
- } else {
- if (oldMetadata.isInProgress() && !metadata.isInProgress()) {
- lid2LogSegments.put(metadata.getLogSegmentId(), metadata);
- } else {
- lid2LogSegments.put(oldMetadata.getLogSegmentId(), oldMetadata);
- }
- }
- }
- }
-
- /**
- * Retrieve log segment <code>name</code> from the cache.
- *
- * @param name
- * name of the log segment.
- * @return log segment metadata
- */
- public LogSegmentMetadata get(String name) {
- synchronized (logSegments) {
- return logSegments.get(name);
- }
- }
-
- /**
- * Update the log segment cache with removed/added segments.
- *
- * @param segmentsRemoved
- * segments that removed
- * @param segmentsAdded
- * segments that added
- */
- public void update(Set<String> segmentsRemoved,
- Map<String, LogSegmentMetadata> segmentsAdded) {
- synchronized (logSegments) {
- for (Map.Entry<String, LogSegmentMetadata> entry : segmentsAdded.entrySet()) {
- add(entry.getKey(), entry.getValue());
- }
- for (String segment : segmentsRemoved) {
- remove(segment);
- }
- }
- }
-
- /**
- * Diff with new received segment list <code>segmentReceived</code>.
- *
- * @param segmentsReceived
- * new received segment list
- * @return segments added (left) and removed (right).
- */
- public Pair<Set<String>, Set<String>> diff(Set<String> segmentsReceived) {
- Set<String> segmentsAdded;
- Set<String> segmentsRemoved;
- synchronized (logSegments) {
- Set<String> segmentsCached = logSegments.keySet();
- segmentsAdded = Sets.difference(segmentsReceived, segmentsCached).immutableCopy();
- segmentsRemoved = Sets.difference(segmentsCached, segmentsReceived).immutableCopy();
- }
- return Pair.of(segmentsAdded, segmentsRemoved);
- }
-
- /**
- * Remove log segment <code>name</code> from the cache.
- *
- * @param name
- * name of the log segment.
- * @return log segment metadata.
- */
- public LogSegmentMetadata remove(String name) {
- synchronized (logSegments) {
- LogSegmentMetadata metadata = logSegments.remove(name);
- if (null != metadata) {
- lid2LogSegments.remove(metadata.getLogSegmentId(), metadata);
- LOG.debug("Removed log segment ({} : {}) from cache.", name, metadata);
- }
- return metadata;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java
deleted file mode 100644
index 0101bff..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.twitter.distributedlog.util.Sizable;
-
-public interface RollingPolicy {
- /**
- * Determines if a rollover may be appropriate at this time.
- *
- * @param sizable
- * Any object that is sizable.
- * @param lastRolloverTimeMs
- * last rolling time in millis.
- * @return true if a rollover is required. otherwise, false.
- */
- boolean shouldRollover(Sizable sizable, long lastRolloverTimeMs);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java
deleted file mode 100644
index 8b1fa0f..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.twitter.distributedlog.util.Sizable;
-
-public class SizeBasedRollingPolicy implements RollingPolicy {
-
- final long maxSize;
-
- public SizeBasedRollingPolicy(long maxSize) {
- this.maxSize = maxSize;
- }
-
- @Override
- public boolean shouldRollover(Sizable sizable, long lastRolloverTimeMs) {
- return sizable.size() > maxSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java
deleted file mode 100644
index bc88720..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.twitter.distributedlog.util.Sizable;
-import com.twitter.distributedlog.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TimeBasedRollingPolicy implements RollingPolicy {
-
- final static Logger LOG = LoggerFactory.getLogger(TimeBasedRollingPolicy.class);
-
- final long rollingIntervalMs;
-
- public TimeBasedRollingPolicy(long rollingIntervalMs) {
- this.rollingIntervalMs = rollingIntervalMs;
- }
-
- @Override
- public boolean shouldRollover(Sizable sizable, long lastRolloverTimeMs) {
- long elapsedMs = Utils.elapsedMSec(lastRolloverTimeMs);
- boolean shouldSwitch = elapsedMs > rollingIntervalMs;
- if (shouldSwitch) {
- LOG.debug("Last Finalize Time: {} elapsed time (MSec): {}", lastRolloverTimeMs,
- elapsedMs);
- }
- return shouldSwitch;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java
deleted file mode 100644
index 0f5b877..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Log Segment Management
- */
-package com.twitter.distributedlog.logsegment;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java
deleted file mode 100644
index 178074a..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import java.io.IOException;
-
-/**
- * Specific config of a given implementation of DL
- */
-public interface DLConfig {
- /**
- * Serialize the dl config into a string.
- */
- public String serialize();
-
- /**
- * Deserialize the dl config from a readable stream.
- *
- * @param data
- * bytes to desrialize dl config.
- * @throws IOException if fail to deserialize the dl config string representation.
- */
- public void deserialize(byte[] data) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
deleted file mode 100644
index c0b5fb7..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.net.URI;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Metadata of a given DL instance.
- */
-public class DLMetadata {
-
- static final Logger LOG = LoggerFactory.getLogger(DLMetadata.class);
-
- static final String LINE_SPLITTER = "\n";
- static final String BK_DL_TYPE = "BKDL";
- static final int METADATA_FORMAT_VERSION = 1;
-
- // metadata format version
- private int metadataFormatVersion = 0;
- // underlying dl type
- private String dlType;
- // underlying dl config
- private DLConfig dlConfig;
-
- public DLMetadata(String dlType, DLConfig dlConfig) {
- this(dlType, dlConfig, METADATA_FORMAT_VERSION);
- }
-
- DLMetadata(String dlType, DLConfig dlConfig, int metadataFormatVersion) {
- this.dlType = dlType;
- this.dlConfig = dlConfig;
- this.metadataFormatVersion = metadataFormatVersion;
- }
-
- /**
- * @return DL type
- */
- public String getDLType() {
- return dlType;
- }
-
- /**
- * @return DL Config
- */
- public DLConfig getDLConfig() {
- return dlConfig;
- }
-
- /**
- * Serialize the DL metadata into bytes array.
- *
- * @return bytes of DL metadata.
- */
- public byte[] serialize() {
- StringBuilder sb = new StringBuilder();
- sb.append(metadataFormatVersion).append(LINE_SPLITTER);
- sb.append(dlType).append(LINE_SPLITTER);
- sb.append(dlConfig.serialize());
- LOG.debug("Serialized dl metadata {}.", sb);
- return sb.toString().getBytes(UTF_8);
- }
-
- @Override
- public int hashCode() {
- return dlType.hashCode() * 13 + dlConfig.hashCode();
- }
-
- @Override
- public String toString() {
- return new String(serialize(), UTF_8);
- }
-
- public void update(URI uri) throws IOException {
- DistributedLogConfiguration conf = new DistributedLogConfiguration();
- ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
- .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
- .retryThreadCount(conf.getZKClientNumberRetryThreads())
- .requestRateLimit(conf.getZKRequestRateLimit())
- .zkAclId(conf.getZkAclId())
- .uri(uri)
- .build();
- byte[] data = serialize();
- try {
- zkc.get().setData(uri.getPath(), data, -1);
- } catch (KeeperException e) {
- throw new IOException("Fail to update dl metadata " + new String(data, UTF_8)
- + " to uri " + uri, e);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted when updating dl metadata "
- + new String(data, UTF_8) + " to uri " + uri, e);
- } finally {
- zkc.close();
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof DLMetadata)) {
- return false;
- }
- DLMetadata other = (DLMetadata) o;
- return dlType.equals(other.dlType) && dlConfig.equals(other.dlConfig);
- }
-
- public void create(URI uri) throws IOException {
- DistributedLogConfiguration conf = new DistributedLogConfiguration();
- ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
- .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
- .retryThreadCount(conf.getZKClientNumberRetryThreads())
- .requestRateLimit(conf.getZKRequestRateLimit())
- .zkAclId(conf.getZkAclId())
- .uri(uri)
- .build();
- byte[] data = serialize();
- try {
- Utils.zkCreateFullPathOptimistic(zkc, uri.getPath(), data,
- zkc.getDefaultACL(), CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- throw new IOException("Fail to write dl metadata " + new String(data, UTF_8)
- + " to uri " + uri, e);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted when writing dl metadata " + new String(data, UTF_8)
- + " to uri " + uri, e);
- } finally {
- zkc.close();
- }
- }
-
- public static void unbind(URI uri) throws IOException {
- DistributedLogConfiguration conf = new DistributedLogConfiguration();
- ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
- .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
- .retryThreadCount(conf.getZKClientNumberRetryThreads())
- .requestRateLimit(conf.getZKRequestRateLimit())
- .zkAclId(conf.getZkAclId())
- .uri(uri)
- .build();
- byte[] data = new byte[0];
- try {
- zkc.get().setData(uri.getPath(), data, -1);
- } catch (KeeperException ke) {
- throw new IOException("Fail to unbound dl metadata on uri " + uri, ke);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted when unbinding dl metadata on uri " + uri, ie);
- } finally {
- zkc.close();
- }
- }
-
- /**
- * Deserialize dl metadata of given <i>uri</i> from a given bytes array.
- *
- * @param uri
- * uri that stored dl metadata bindings
- * @param data
- * bytes of dl metadata
- * @return dl metadata
- * @throws IOException if failed to parse the bytes array
- */
- public static DLMetadata deserialize(URI uri, byte[] data) throws IOException {
- String metadata = new String(data, UTF_8);
- LOG.debug("Parsing dl metadata {}.", metadata);
- BufferedReader br = new BufferedReader(new StringReader(metadata));
- String versionLine = br.readLine();
- if (null == versionLine) {
- throw new IOException("Empty DL Metadata.");
- }
- int version;
- try {
- version = Integer.parseInt(versionLine);
- } catch (NumberFormatException nfe) {
- version = -1;
- }
- if (METADATA_FORMAT_VERSION != version) {
- throw new IOException("Metadata version not compatible. Expected "
- + METADATA_FORMAT_VERSION + " but got " + version);
- }
- String type = br.readLine();
- if (!BK_DL_TYPE.equals(type)) {
- throw new IOException("Invalid DL type : " + type);
- }
- DLConfig dlConfig = new BKDLConfig(uri);
- StringBuilder sb = new StringBuilder();
- String line;
- while (null != (line = br.readLine())) {
- sb.append(line);
- }
- dlConfig.deserialize(sb.toString().getBytes(UTF_8));
- return new DLMetadata(type, dlConfig, version);
- }
-
- public static DLMetadata create(BKDLConfig bkdlConfig) {
- return new DLMetadata(BK_DL_TYPE, bkdlConfig);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java
deleted file mode 100644
index b2a417e..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.util.Future;
-
-public class DryrunLogSegmentMetadataStoreUpdater extends LogSegmentMetadataStoreUpdater {
-
- public DryrunLogSegmentMetadataStoreUpdater(DistributedLogConfiguration conf,
- LogSegmentMetadataStore metadataStore) {
- super(conf, metadataStore);
- }
-
- @Override
- public Transaction<Object> transaction() {
- return new Transaction<Object>() {
- @Override
- public void addOp(Op<Object> operation) {
- // no-op
- }
-
- @Override
- public Future<Void> execute() {
- return Future.Void();
- }
-
- @Override
- public void abort(Throwable reason) {
- // no-op
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java
deleted file mode 100644
index c878d68..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import java.net.URI;
-
-/**
- * Class to represent the layout and metadata of the zookeeper-based log metadata
- */
-public class LogMetadata {
-
- protected static String getLogComponentPath(URI uri, String logName, String logIdentifier, String component) {
- return String.format("%s/%s/%s%s", uri.getPath(), logName, logIdentifier, component);
- }
-
- /**
- * Get the top stream path for a given log.
- *
- * @param uri namespace to store the log
- * @param logName name of the log
- * @return top stream path
- */
- public static String getLogStreamPath(URI uri, String logName) {
- return String.format("%s/%s", uri.getPath(), logName);
- }
-
- /**
- * Get the log root path for a given log.
- *
- * @param uri
- * namespace to store the log
- * @param logName
- * name of the log
- * @param logIdentifier
- * identifier of the log
- * @return log root path
- */
- public static String getLogRootPath(URI uri, String logName, String logIdentifier) {
- return getLogComponentPath(uri, logName, logIdentifier, "");
- }
-
- /**
- * Get the logsegments root path for a given log.
- *
- * @param uri
- * namespace to store the log
- * @param logName
- * name of the log
- * @param logIdentifier
- * identifier of the log
- * @return logsegments root path
- */
- public static String getLogSegmentsPath(URI uri, String logName, String logIdentifier) {
- return getLogComponentPath(uri, logName, logIdentifier, LOGSEGMENTS_PATH);
- }
-
- public static final int LAYOUT_VERSION = -1;
- public final static String LOGSEGMENTS_PATH = "/ledgers";
- public final static String VERSION_PATH = "/version";
- // writer znodes
- public final static String MAX_TXID_PATH = "/maxtxid";
- public final static String LOCK_PATH = "/lock";
- public final static String ALLOCATION_PATH = "/allocation";
- // reader znodes
- public final static String READ_LOCK_PATH = "/readLock";
-
- protected final URI uri;
- protected final String logName;
- protected final String logIdentifier;
-
- // Root path of the log
- protected final String logRootPath;
- // Components
- protected final String logSegmentsPath;
- protected final String lockPath;
- protected final String maxTxIdPath;
- protected final String allocationPath;
-
- /**
- * metadata representation of a log
- *
- * @param uri
- * namespace to store the log
- * @param logName
- * name of the log
- * @param logIdentifier
- * identifier of the log
- */
- protected LogMetadata(URI uri,
- String logName,
- String logIdentifier) {
- this.uri = uri;
- this.logName = logName;
- this.logIdentifier = logIdentifier;
- this.logRootPath = getLogRootPath(uri, logName, logIdentifier);
- this.logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
- this.lockPath = logRootPath + LOCK_PATH;
- this.maxTxIdPath = logRootPath + MAX_TXID_PATH;
- this.allocationPath = logRootPath + ALLOCATION_PATH;
- }
-
- public URI getUri() {
- return uri;
- }
-
- public String getLogName() {
- return logName;
- }
-
- /**
- * Get the root path of the log.
- *
- * @return root path of the log.
- */
- public String getLogRootPath() {
- return logRootPath;
- }
-
- /**
- * Get the root path for log segments.
- *
- * @return root path for log segments
- */
- public String getLogSegmentsPath() {
- return this.logSegmentsPath;
- }
-
- /**
- * Get the path for a log segment of the log.
- *
- * @param segmentName
- * segment name
- * @return path for the log segment
- */
- public String getLogSegmentPath(String segmentName) {
- return this.logSegmentsPath + "/" + segmentName;
- }
-
- public String getLockPath() {
- return lockPath;
- }
-
- public String getMaxTxIdPath() {
- return maxTxIdPath;
- }
-
- public String getAllocationPath() {
- return allocationPath;
- }
-
- /**
- * Get the fully qualified name of the log.
- *
- * @return fully qualified name
- */
- public String getFullyQualifiedName() {
- return String.format("%s:%s", logName, logIdentifier);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java
deleted file mode 100644
index ff6bfca..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.google.common.base.Optional;
-
-import java.net.URI;
-
-/**
- * Log Metadata for Reader
- */
-public class LogMetadataForReader extends LogMetadata {
-
- /**
- * Get the root path to store subscription infos of a log.
- *
- * @param uri
- * namespace of the log
- * @param logName
- * name of the log
- * @param logIdentifier
- * identifier of the log
- * @return subscribers root path
- */
- public static String getSubscribersPath(URI uri, String logName, String logIdentifier) {
- return getLogComponentPath(uri, logName, logIdentifier, SUBSCRIBERS_PATH);
- }
-
- /**
- * Get the path that stores subscription info for a <code>subscriberId</code> for a <code>log</code>.
- *
- * @param uri
- * namespace of the log
- * @param logName
- * name of the log
- * @param logIdentifier
- * identifier of the log
- * @param subscriberId
- * subscriber id of the log
- * @return subscriber's path
- */
- public static String getSubscriberPath(URI uri, String logName, String logIdentifier, String subscriberId) {
- return String.format("%s/%s", getSubscribersPath(uri, logName, logIdentifier), subscriberId);
- }
-
- /**
- * Create a metadata representation of a log for reader.
- *
- * @param uri
- * namespace to store the log
- * @param logName
- * name of the log
- * @param logIdentifier
- * identifier of the log
- * @return metadata representation of a log for reader
- */
- public static LogMetadataForReader of(URI uri, String logName, String logIdentifier) {
- return new LogMetadataForReader(uri, logName, logIdentifier);
- }
-
- final static String SUBSCRIBERS_PATH = "/subscribers";
-
- /**
- * metadata representation of a log
- *
- * @param uri namespace to store the log
- * @param logName name of the log
- * @param logIdentifier identifier of the log
- */
- private LogMetadataForReader(URI uri, String logName, String logIdentifier) {
- super(uri, logName, logIdentifier);
- }
-
- /**
- * Get the readlock path for the log or a subscriber of the log.
- *
- * @param subscriberId
- * subscriber id. it is optional.
- * @return read lock path
- */
- public String getReadLockPath(Optional<String> subscriberId) {
- if (subscriberId.isPresent()) {
- return logRootPath + SUBSCRIBERS_PATH + "/" + subscriberId.get() + READ_LOCK_PATH;
- } else {
- return logRootPath + READ_LOCK_PATH;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java
deleted file mode 100644
index 2284cbb..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import org.apache.bookkeeper.versioning.Versioned;
-
-import java.net.URI;
-
-/**
- * Log Metadata for writer
- */
-public class LogMetadataForWriter extends LogMetadata {
-
- private final Versioned<byte[]> maxLSSNData;
- private final Versioned<byte[]> maxTxIdData;
- private final Versioned<byte[]> allocationData;
-
- /**
- * metadata representation of a log
- *
- * @param uri namespace to store the log
- * @param logName name of the log
- * @param logIdentifier identifier of the log
- */
- public LogMetadataForWriter(URI uri,
- String logName,
- String logIdentifier,
- Versioned<byte[]> maxLSSNData,
- Versioned<byte[]> maxTxIdData,
- Versioned<byte[]> allocationData) {
- super(uri, logName, logIdentifier);
- this.maxLSSNData = maxLSSNData;
- this.maxTxIdData = maxTxIdData;
- this.allocationData = allocationData;
- }
-
- public Versioned<byte[]> getMaxLSSNData() {
- return maxLSSNData;
- }
-
- public Versioned<byte[]> getMaxTxIdData() {
- return maxTxIdData;
- }
-
- public Versioned<byte[]> getAllocationData() {
- return allocationData;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java
deleted file mode 100644
index 01dccb7..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Optional;
-import com.twitter.distributedlog.callback.NamespaceListener;
-import com.twitter.util.Future;
-
-import java.net.URI;
-import java.util.Iterator;
-
-/**
- * Interface for log metadata store.
- */
-@Beta
-public interface LogMetadataStore {
-
- /**
- * Create a stream and return it is namespace location.
- *
- * @param logName
- * name of the log
- * @return namespace location that stores this stream.
- */
- Future<URI> createLog(String logName);
-
- /**
- * Get the location of the log.
- *
- * @param logName
- * name of the log
- * @return namespace location that stores this stream.
- */
- Future<Optional<URI>> getLogLocation(String logName);
-
- /**
- * Retrieves logs from the namespace.
- *
- * @return iterator of logs of the namespace.
- */
- Future<Iterator<String>> getLogs();
-
- /**
- * Register a namespace listener on streams changes.
- *
- * @param listener
- * namespace listener
- */
- void registerNamespaceListener(NamespaceListener listener);
-}