You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:13 UTC
[06/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
deleted file mode 100644
index 862f05a..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.util.Utils;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.List;
-import java.util.TreeSet;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Transaction;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to
- * avoid necessitating an additional system for the resource placement.
- */
-public class ZKPlacementStateManager implements PlacementStateManager {
-
- private static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
-
- private static final String SERVER_LOAD_DIR = "/.server-load";
-
- private final String serverLoadPath;
- private final ZooKeeperClient zkClient;
-
- private boolean watching = false;
-
- public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
- String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
- zkClient = BKNamespaceDriver.createZKClientBuilder(
- String.format("ZKPlacementStateManager-%s", zkServers),
- conf,
- zkServers,
- statsLogger.scope("placement_state_manager")).build();
- serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
- }
-
- private void createServerLoadPathIfNoExists(byte[] data)
- throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
- try {
- Utils.zkCreateFullPathOptimistic(
- zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException nee) {
- logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
- }
- }
-
- @Override
- public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
- logger.info("saving ownership");
- try {
- ZooKeeper zk = zkClient.get();
- // use timestamp as data so watchers will see any changes
- byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
-
- if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
- createServerLoadPathIfNoExists(timestamp);
- }
-
- Transaction tx = zk.transaction();
- List<String> children = zk.getChildren(serverLoadPath, false);
- HashSet<String> servers = new HashSet<String>(children);
- tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
- for (ServerLoad serverLoad : serverLoads) {
- String server = serverToZkFormat(serverLoad.getServer());
- String serverPath = serverPath(server);
- if (servers.contains(server)) {
- servers.remove(server);
- tx.setData(serverPath, serverLoad.serialize(), -1);
- } else {
- tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
- }
- }
- for (String server : servers) {
- tx.delete(serverPath(server), -1);
- }
- tx.commit();
- } catch (InterruptedException | IOException | KeeperException e) {
- throw new StateManagerSaveException(e);
- }
- }
-
- @Override
- public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
- TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
- try {
- ZooKeeper zk = zkClient.get();
- List<String> children = zk.getChildren(serverLoadPath, false);
- for (String server : children) {
- ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
- }
- return ownerships;
- } catch (InterruptedException | IOException | KeeperException e) {
- throw new StateManagerLoadException(e);
- }
- }
-
- @Override
- public synchronized void watch(final PlacementCallback callback) {
- if (watching) {
- return; // do not double watch
- }
- watching = true;
-
- try {
- ZooKeeper zk = zkClient.get();
- try {
- zk.getData(serverLoadPath, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- try {
- callback.callback(loadOwnership());
- } catch (StateManagerLoadException e) {
- logger.error("Watch of Ownership failed", e);
- } finally {
- watching = false;
- watch(callback);
- }
- }
- }, new Stat());
- } catch (KeeperException.NoNodeException nee) {
- byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
- createServerLoadPathIfNoExists(timestamp);
- watching = false;
- watch(callback);
- }
- } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
- logger.error("Watch of Ownership failed", e);
- watching = false;
- watch(callback);
- }
- }
-
- public String serverPath(String server) {
- return String.format("%s/%s", serverLoadPath, server);
- }
-
- protected String serverToZkFormat(String server) {
- return server.replaceAll("/", "--");
- }
-
- protected String zkFormatToServer(String zkFormattedServer) {
- return zkFormattedServer.replaceAll("--", "/");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java
deleted file mode 100644
index ea79251..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Placement Policy to place streams across proxy services.
- */
-package org.apache.distributedlog.service.placement;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
deleted file mode 100644
index 83ac668..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.exceptions.ChecksumFailedException;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Try;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-/**
- * Abstract Stream Operation.
- */
-public abstract class AbstractStreamOp<Response> implements StreamOp {
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class);
-
- protected final String stream;
- protected final OpStatsLogger opStatsLogger;
- private final Promise<Response> result = new Promise<Response>();
- protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
- protected final Long checksum;
- protected final Feature checksumDisabledFeature;
-
- public AbstractStreamOp(String stream,
- OpStatsLogger statsLogger,
- Long checksum,
- Feature checksumDisabledFeature) {
- this.stream = stream;
- this.opStatsLogger = statsLogger;
- // start here in case the operation is failed before executing.
- stopwatch.reset().start();
- this.checksum = checksum;
- this.checksumDisabledFeature = checksumDisabledFeature;
- }
-
- @Override
- public String streamName() {
- return stream;
- }
-
- @Override
- public Stopwatch stopwatch() {
- return stopwatch;
- }
-
- @Override
- public void preExecute() throws DLException {
- if (!checksumDisabledFeature.isAvailable() && null != checksum) {
- Long serverChecksum = computeChecksum();
- if (null != serverChecksum && !checksum.equals(serverChecksum)) {
- throw new ChecksumFailedException();
- }
- }
- }
-
- @Override
- public Long computeChecksum() {
- return null;
- }
-
- @Override
- public Future<Void> execute(AsyncLogWriter writer, Sequencer sequencer, Object txnLock) {
- stopwatch.reset().start();
- return executeOp(writer, sequencer, txnLock)
- .addEventListener(new FutureEventListener<Response>() {
- @Override
- public void onSuccess(Response response) {
- opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
- setResponse(response);
- }
- @Override
- public void onFailure(Throwable cause) {
- }
- }).voided();
- }
-
- /**
- * Fail with current <i>owner</i> and its reason <i>t</i>.
- *
- * @param cause
- * failure reason
- */
- @Override
- public void fail(Throwable cause) {
- if (cause instanceof OwnershipAcquireFailedException) {
- // Ownership exception is a control exception, not an error, so we don't stat
- // it with the other errors.
- OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause;
- fail(ResponseUtils.ownerToHeader(oafe.getCurrentOwner()));
- } else {
- opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
- fail(ResponseUtils.exceptionToHeader(cause));
- }
- }
-
- protected void setResponse(Response response) {
- Return<Response> responseTry = new Return(response);
- boolean isEmpty = result.updateIfEmpty(responseTry);
- if (!isEmpty) {
- Option<Try<Response>> resultTry = result.poll();
- logger.error("Result set multiple times. Value='{}', New='{}'", resultTry, responseTry);
- }
- }
-
- /**
- * Return the full response, header and body.
- *
- * @return A future containing the response or the exception
- * encountered by the op if it failed.
- */
- public Future<Response> result() {
- return result;
- }
-
- /**
- * Execute the operation and return its corresponding response.
- *
- * @param writer
- * writer to execute the operation.
- * @param sequencer
- * sequencer used for generating transaction id for stream operations
- * @param txnLock
- * transaction lock to guarantee ordering of transaction id
- * @return future representing the operation.
- */
- protected abstract Future<Response> executeOp(AsyncLogWriter writer,
- Sequencer sequencer,
- Object txnLock);
-
- // fail the result with the given response header
- protected abstract void fail(ResponseHeader header);
-
- public static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) {
- return requestLogger(statsLogger).getOpStatsLogger(opName);
- }
-
- public static StatsLogger requestLogger(StatsLogger statsLogger) {
- return statsLogger.scope("request");
- }
-
- public static StatsLogger requestScope(StatsLogger statsLogger, String scope) {
- return requestLogger(statsLogger).scope(scope);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java
deleted file mode 100644
index 77c7d71..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ProtocolUtils;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Abstract Write Operation.
- */
-public abstract class AbstractWriteOp extends AbstractStreamOp<WriteResponse> {
-
- protected AbstractWriteOp(String stream,
- OpStatsLogger statsLogger,
- Long checksum,
- Feature checksumDisabledFeature) {
- super(stream, statsLogger, checksum, checksumDisabledFeature);
- }
-
- @Override
- protected void fail(ResponseHeader header) {
- setResponse(ResponseUtils.write(header));
- }
-
- @Override
- public Long computeChecksum() {
- return ProtocolUtils.streamOpCRC32(stream);
- }
-
- @Override
- public Future<ResponseHeader> responseHeader() {
- return result().map(new AbstractFunction1<WriteResponse, ResponseHeader>() {
- @Override
- public ResponseHeader apply(WriteResponse response) {
- return response.getHeader();
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
deleted file mode 100644
index 6c98468..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.exceptions.AlreadyClosedException;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.LockingException;
-import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import org.apache.distributedlog.exceptions.RequestDeniedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.ConstFuture;
-import com.twitter.util.Future;
-import com.twitter.util.Future$;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Try;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Bulk Write Operation.
- */
-public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements WriteOpWithPayload {
- private final List<ByteBuffer> buffers;
- private final long payloadSize;
-
- // Stats
- private final Counter deniedBulkWriteCounter;
- private final Counter successRecordCounter;
- private final Counter failureRecordCounter;
- private final Counter redirectRecordCounter;
- private final OpStatsLogger latencyStat;
- private final Counter bytes;
- private final Counter bulkWriteBytes;
-
- private final AccessControlManager accessControlManager;
-
- // We need to pass these through to preserve ownership change behavior in
- // client/server. Only include failures which are guaranteed to have failed
- // all subsequent writes.
- private boolean isDefiniteFailure(Try<DLSN> result) {
- boolean def = false;
- try {
- result.get();
- } catch (Exception ex) {
- if (ex instanceof OwnershipAcquireFailedException
- || ex instanceof AlreadyClosedException
- || ex instanceof LockingException) {
- def = true;
- }
- }
- return def;
- }
-
- public BulkWriteOp(String stream,
- List<ByteBuffer> buffers,
- StatsLogger statsLogger,
- StatsLogger perStreamStatsLogger,
- StreamPartitionConverter streamPartitionConverter,
- Long checksum,
- Feature checksumDisabledFeature,
- AccessControlManager accessControlManager) {
- super(stream, requestStat(statsLogger, "bulkWrite"), checksum, checksumDisabledFeature);
- this.buffers = buffers;
- long total = 0;
- // We do this here because the bytebuffers are mutable.
- for (ByteBuffer bb : buffers) {
- total += bb.remaining();
- }
- this.payloadSize = total;
-
- final Partition partition = streamPartitionConverter.convert(stream);
- // Write record stats
- StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
- this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite");
- this.successRecordCounter = streamOpStats.recordsCounter("success");
- this.failureRecordCounter = streamOpStats.recordsCounter("failure");
- this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
- this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes");
- this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "bulkWrite");
- this.bytes = streamOpStats.streamRequestCounter(partition, "bulkWrite", "bytes");
-
- this.accessControlManager = accessControlManager;
-
- final long size = getPayloadSize();
- result().addEventListener(new FutureEventListener<BulkWriteResponse>() {
- @Override
- public void onSuccess(BulkWriteResponse response) {
- if (response.getHeader().getCode() == StatusCode.SUCCESS) {
- latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
- bytes.add(size);
- bulkWriteBytes.add(size);
- } else {
- latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
- }
- }
- @Override
- public void onFailure(Throwable cause) {
- latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
- }
- });
- }
-
- @Override
- public void preExecute() throws DLException {
- if (!accessControlManager.allowWrite(stream)) {
- deniedBulkWriteCounter.inc();
- throw new RequestDeniedException(stream, "bulkWrite");
- }
- super.preExecute();
- }
-
- @Override
- public long getPayloadSize() {
- return payloadSize;
- }
-
- @Override
- protected Future<BulkWriteResponse> executeOp(AsyncLogWriter writer,
- Sequencer sequencer,
- Object txnLock) {
- // Need to convert input buffers to LogRecords.
- List<LogRecord> records;
- Future<List<Future<DLSN>>> futureList;
- synchronized (txnLock) {
- records = asRecordList(buffers, sequencer);
- futureList = writer.writeBulk(records);
- }
-
- // Collect into a list of tries to make it easier to extract exception or DLSN.
- Future<List<Try<DLSN>>> writes = asTryList(futureList);
-
- Future<BulkWriteResponse> response = writes.flatMap(
- new AbstractFunction1<List<Try<DLSN>>, Future<BulkWriteResponse>>() {
- @Override
- public Future<BulkWriteResponse> apply(List<Try<DLSN>> results) {
-
- // Considered a success at batch level even if no individual writes succeeed.
- // The reason is that its impossible to make an appropriate decision re retries without
- // individual buffer failure reasons.
- List<WriteResponse> writeResponses = new ArrayList<WriteResponse>(results.size());
- BulkWriteResponse bulkWriteResponse =
- ResponseUtils.bulkWriteSuccess().setWriteResponses(writeResponses);
-
- // Promote the first result to an op-level failure if we're sure all other writes have
- // failed.
- if (results.size() > 0) {
- Try<DLSN> firstResult = results.get(0);
- if (isDefiniteFailure(firstResult)) {
- return new ConstFuture(firstResult);
- }
- }
-
- // Translate all futures to write responses.
- Iterator<Try<DLSN>> iterator = results.iterator();
- while (iterator.hasNext()) {
- Try<DLSN> completedFuture = iterator.next();
- try {
- DLSN dlsn = completedFuture.get();
- WriteResponse writeResponse = ResponseUtils.writeSuccess().setDlsn(dlsn.serialize());
- writeResponses.add(writeResponse);
- successRecordCounter.inc();
- } catch (Exception ioe) {
- WriteResponse writeResponse = ResponseUtils.write(ResponseUtils.exceptionToHeader(ioe));
- writeResponses.add(writeResponse);
- if (StatusCode.FOUND == writeResponse.getHeader().getCode()) {
- redirectRecordCounter.inc();
- } else {
- failureRecordCounter.inc();
- }
- }
- }
-
- return Future.value(bulkWriteResponse);
- }
- }
- );
-
- return response;
- }
-
- private List<LogRecord> asRecordList(List<ByteBuffer> buffers, Sequencer sequencer) {
- List<LogRecord> records = new ArrayList<LogRecord>(buffers.size());
- for (ByteBuffer buffer : buffers) {
- byte[] payload = new byte[buffer.remaining()];
- buffer.get(payload);
- records.add(new LogRecord(sequencer.nextId(), payload));
- }
- return records;
- }
-
- private Future<List<Try<DLSN>>> asTryList(Future<List<Future<DLSN>>> futureList) {
- return futureList.flatMap(new AbstractFunction1<List<Future<DLSN>>, Future<List<Try<DLSN>>>>() {
- @Override
- public Future<List<Try<DLSN>>> apply(List<Future<DLSN>> results) {
- return Future$.MODULE$.collectToTry(results);
- }
- });
- }
-
- @Override
- protected void fail(ResponseHeader header) {
- if (StatusCode.FOUND == header.getCode()) {
- redirectRecordCounter.add(buffers.size());
- } else {
- failureRecordCounter.add(buffers.size());
- }
- setResponse(ResponseUtils.bulkWrite(header));
- }
-
- @Override
- public Future<ResponseHeader> responseHeader() {
- return result().map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() {
- @Override
- public ResponseHeader apply(BulkWriteResponse response) {
- return response.getHeader();
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
deleted file mode 100644
index 3ecb46f..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.RequestDeniedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Operation to delete a log stream.
- */
-public class DeleteOp extends AbstractWriteOp {
- private final StreamManager streamManager;
- private final Counter deniedDeleteCounter;
- private final AccessControlManager accessControlManager;
-
- public DeleteOp(String stream,
- StatsLogger statsLogger,
- StatsLogger perStreamStatsLogger,
- StreamManager streamManager,
- Long checksum,
- Feature checksumEnabledFeature,
- AccessControlManager accessControlManager) {
- super(stream, requestStat(statsLogger, "delete"), checksum, checksumEnabledFeature);
- StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
- this.deniedDeleteCounter = streamOpStats.requestDeniedCounter("delete");
- this.accessControlManager = accessControlManager;
- this.streamManager = streamManager;
- }
-
- @Override
- protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
- Sequencer sequencer,
- Object txnLock) {
- Future<Void> result = streamManager.deleteAndRemoveAsync(streamName());
- return result.map(new AbstractFunction1<Void, WriteResponse>() {
- @Override
- public WriteResponse apply(Void value) {
- return ResponseUtils.writeSuccess();
- }
- });
- }
-
- @Override
- public void preExecute() throws DLException {
- if (!accessControlManager.allowTruncate(stream)) {
- deniedDeleteCounter.inc();
- throw new RequestDeniedException(stream, "delete");
- }
- super.preExecute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
deleted file mode 100644
index 0ffa619..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.RequestDeniedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Heartbeat Operation.
- */
-public class HeartbeatOp extends AbstractWriteOp {
-
- static final byte[] HEARTBEAT_DATA = "heartbeat".getBytes(UTF_8);
-
- private final AccessControlManager accessControlManager;
- private final Counter deniedHeartbeatCounter;
- private final byte dlsnVersion;
-
- private boolean writeControlRecord = false;
-
- public HeartbeatOp(String stream,
- StatsLogger statsLogger,
- StatsLogger perStreamStatsLogger,
- byte dlsnVersion,
- Long checksum,
- Feature checksumDisabledFeature,
- AccessControlManager accessControlManager) {
- super(stream, requestStat(statsLogger, "heartbeat"), checksum, checksumDisabledFeature);
- StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
- this.deniedHeartbeatCounter = streamOpStats.requestDeniedCounter("heartbeat");
- this.dlsnVersion = dlsnVersion;
- this.accessControlManager = accessControlManager;
- }
-
- public HeartbeatOp setWriteControlRecord(boolean writeControlRecord) {
- this.writeControlRecord = writeControlRecord;
- return this;
- }
-
- @Override
- protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
- Sequencer sequencer,
- Object txnLock) {
- // write a control record if heartbeat is the first request of the recovered log segment.
- if (writeControlRecord) {
- long txnId;
- Future<DLSN> writeResult;
- synchronized (txnLock) {
- txnId = sequencer.nextId();
- LogRecord hbRecord = new LogRecord(txnId, HEARTBEAT_DATA);
- hbRecord.setControl();
- writeResult = writer.write(hbRecord);
- }
- return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() {
- @Override
- public WriteResponse apply(DLSN value) {
- return ResponseUtils.writeSuccess().setDlsn(value.serialize(dlsnVersion));
- }
- });
- } else {
- return Future.value(ResponseUtils.writeSuccess());
- }
- }
-
- @Override
- public void preExecute() throws DLException {
- if (!accessControlManager.allowAcquire(stream)) {
- deniedHeartbeatCounter.inc();
- throw new RequestDeniedException(stream, "heartbeat");
- }
- super.preExecute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
deleted file mode 100644
index 6ec8642..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.RequestDeniedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Operation to release ownership of a log stream.
- */
-public class ReleaseOp extends AbstractWriteOp {
- private final StreamManager streamManager;
- private final Counter deniedReleaseCounter;
- private final AccessControlManager accessControlManager;
-
- public ReleaseOp(String stream,
- StatsLogger statsLogger,
- StatsLogger perStreamStatsLogger,
- StreamManager streamManager,
- Long checksum,
- Feature checksumDisabledFeature,
- AccessControlManager accessControlManager) {
- super(stream, requestStat(statsLogger, "release"), checksum, checksumDisabledFeature);
- StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
- this.deniedReleaseCounter = streamOpStats.requestDeniedCounter("release");
- this.accessControlManager = accessControlManager;
- this.streamManager = streamManager;
- }
-
- @Override
- protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
- Sequencer sequencer,
- Object txnLock) {
- Future<Void> result = streamManager.closeAndRemoveAsync(streamName());
- return result.map(new AbstractFunction1<Void, WriteResponse>() {
- @Override
- public WriteResponse apply(Void value) {
- return ResponseUtils.writeSuccess();
- }
- });
- }
-
- @Override
- public void preExecute() throws DLException {
- if (!accessControlManager.allowRelease(stream)) {
- deniedReleaseCounter.inc();
- throw new RequestDeniedException(stream, "release");
- }
- super.preExecute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java
deleted file mode 100644
index 3517a63..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.service.streamset.Partition;
-import com.twitter.util.Future;
-import java.io.IOException;
-
-/**
- * Stream is the per stream request handler in the DL service layer.
- *
- * <p>The collection of Streams in the proxy are managed by StreamManager.
- */
-public interface Stream {
-
- /**
- * Get the stream configuration for this stream.
- *
- * @return stream configuration
- */
- DynamicDistributedLogConfiguration getStreamConfiguration();
-
- /**
- * Get the stream's last recorded current owner (may be out of date). Used
- * as a hint for the client.
- * @return last known owner for the stream
- */
- String getOwner();
-
- /**
- * Get the stream name.
- * @return stream name
- */
- String getStreamName();
-
- /**
- * Get the represented partition name.
- *
- * @return represented partition name.
- */
- Partition getPartition();
-
- /**
- * Expensive initialization code run after stream has been allocated in
- * StreamManager.
- *
- * @throws IOException when encountered exception on initialization
- */
- void initialize() throws IOException;
-
- /**
- * Another initialize method (actually Thread.start). Should probably be
- * moved to initialize().
- */
- void start();
-
- /**
- * Asynchronous close method.
- * @param reason for closing
- * @return future satisfied once close complete
- */
- Future<Void> requestClose(String reason);
-
- /**
- * Delete the stream from DL backend.
- *
- * @throws IOException when encountered exception on deleting the stream.
- */
- void delete() throws IOException;
-
- /**
- * Execute the stream operation against this stream.
- *
- * @param op operation to execute
- */
- void submit(StreamOp op);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java
deleted file mode 100644
index 845ef21..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-
-/**
- * Factory to create a stream with provided stream configuration {@code streamConf}.
- */
-public interface StreamFactory {
-
- /**
- * Create a stream object.
- *
- * @param name stream name
- * @param streamConf stream configuration
- * @param streamManager manager of streams
- * @return stream object
- */
- Stream create(String name,
- DynamicDistributedLogConfiguration streamConf,
- StreamManager streamManager);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
deleted file mode 100644
index 2b90d55..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.FatalErrorHandler;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Timer;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.jboss.netty.util.HashedWheelTimer;
-
-/**
- * The implementation of {@link StreamFactory}.
- */
-public class StreamFactoryImpl implements StreamFactory {
- private final String clientId;
- private final StreamOpStats streamOpStats;
- private final ServerConfiguration serverConfig;
- private final DistributedLogConfiguration dlConfig;
- private final FeatureProvider featureProvider;
- private final StreamConfigProvider streamConfigProvider;
- private final StreamPartitionConverter streamPartitionConverter;
- private final DistributedLogNamespace dlNamespace;
- private final OrderedScheduler scheduler;
- private final FatalErrorHandler fatalErrorHandler;
- private final HashedWheelTimer requestTimer;
- private final Timer futureTimer;
-
- public StreamFactoryImpl(String clientId,
- StreamOpStats streamOpStats,
- ServerConfiguration serverConfig,
- DistributedLogConfiguration dlConfig,
- FeatureProvider featureProvider,
- StreamConfigProvider streamConfigProvider,
- StreamPartitionConverter streamPartitionConverter,
- DistributedLogNamespace dlNamespace,
- OrderedScheduler scheduler,
- FatalErrorHandler fatalErrorHandler,
- HashedWheelTimer requestTimer) {
-
- this.clientId = clientId;
- this.streamOpStats = streamOpStats;
- this.serverConfig = serverConfig;
- this.dlConfig = dlConfig;
- this.featureProvider = featureProvider;
- this.streamConfigProvider = streamConfigProvider;
- this.streamPartitionConverter = streamPartitionConverter;
- this.dlNamespace = dlNamespace;
- this.scheduler = scheduler;
- this.fatalErrorHandler = fatalErrorHandler;
- this.requestTimer = requestTimer;
- this.futureTimer = new com.twitter.finagle.util.HashedWheelTimer(requestTimer);
- }
-
- @Override
- public Stream create(String name,
- DynamicDistributedLogConfiguration streamConf,
- StreamManager streamManager) {
- return new StreamImpl(name,
- streamPartitionConverter.convert(name),
- clientId,
- streamManager,
- streamOpStats,
- serverConfig,
- dlConfig,
- streamConf,
- featureProvider,
- streamConfigProvider,
- dlNamespace,
- scheduler,
- fatalErrorHandler,
- requestTimer,
- futureTimer);
- }
-}