You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:50 UTC
[44/51] [partial] incubator-tephra git commit: Rename package to
org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java b/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java
deleted file mode 100644
index 0d10eb3..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Autogenerated by Thrift Compiler (0.9.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package co.cask.tephra.distributed.thrift;
-
-
-import java.util.Map;
-import java.util.HashMap;
-import org.apache.thrift.TEnum;
-
-public enum TTransactionType implements org.apache.thrift.TEnum {
- SHORT(1),
- LONG(2);
-
- private final int value;
-
- private TTransactionType(int value) {
- this.value = value;
- }
-
- /**
- * Get the integer value of this enum value, as defined in the Thrift IDL.
- */
- public int getValue() {
- return value;
- }
-
- /**
- * Find a the enum type by its integer value, as defined in the Thrift IDL.
- * @return null if the value is not found.
- */
- public static TTransactionType findByValue(int value) {
- switch (value) {
- case 1:
- return SHORT;
- case 2:
- return LONG;
- default:
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java b/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java
deleted file mode 100644
index 6de6f87..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Autogenerated by Thrift Compiler (0.9.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package co.cask.tephra.distributed.thrift;
-
-
-public enum TVisibilityLevel implements org.apache.thrift.TEnum {
- SNAPSHOT(1),
- SNAPSHOT_EXCLUDE_CURRENT(2),
- SNAPSHOT_ALL(3);
-
- private final int value;
-
- private TVisibilityLevel(int value) {
- this.value = value;
- }
-
- /**
- * Get the integer value of this enum value, as defined in the Thrift IDL.
- */
- public int getValue() {
- return value;
- }
-
- /**
- * Find a the enum type by its integer value, as defined in the Thrift IDL.
- * @return null if the value is not found.
- */
- public static TVisibilityLevel findByValue(int value) {
- switch (value) {
- case 1:
- return SNAPSHOT;
- case 2:
- return SNAPSHOT_EXCLUDE_CURRENT;
- case 3:
- return SNAPSHOT_ALL;
- default:
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java
deleted file mode 100644
index 1731b0c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.inmemory;
-
-import co.cask.tephra.InvalidTruncateTimeException;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Implementation of the tx system client that doesn't talk to any global service and tries to do its best to meet the
- * tx system requirements/expectations. In fact it implements enough logic to support running flows (when each flowlet
- * uses its own detached tx system client, without talking to each other and sharing any state) with "process exactly
- * once" guarantee if no failures happen.
- *
- * NOTE: Will NOT detect conflicts. May leave inconsistent state when process crashes. Does NOT provide even read
- * isolation guarantees.
- *
- * Good for performance testing. For demoing high throughput. For use-cases with relaxed tx guarantees.
- */
-public class DetachedTxSystemClient implements TransactionSystemClient {
- // Dataset and queue logic relies on tx id to grow monotonically even after restart. Hence we need to start with
- // value that is for sure bigger than the last one used before restart.
- // NOTE: with code below we assume we don't do more than InMemoryTransactionManager.MAX_TX_PER_MS tx/ms
- // by single client
- private AtomicLong generator = new AtomicLong(System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS);
-
- @Override
- public Transaction startShort() {
- long wp = getWritePointer();
- // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets
- return new Transaction(
- Long.MAX_VALUE - 1, wp, new long[0], new long[0],
- Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT);
- }
-
- private long getWritePointer() {
- long wp = generator.incrementAndGet();
- // NOTE: using InMemoryTransactionManager.MAX_TX_PER_MS to be at least close to real one
- long now = System.currentTimeMillis();
- if (wp < now * TxConstants.MAX_TX_PER_MS) {
- // trying to advance to align with timestamp, but only once: if failed, we'll just try again later with next tx
- long advanced = now * TxConstants.MAX_TX_PER_MS;
- if (generator.compareAndSet(wp, advanced)) {
- wp = advanced;
- }
- }
- return wp;
- }
-
- @Override
- public Transaction startShort(int timeout) {
- return startShort();
- }
-
- @Override
- public Transaction startLong() {
- return startShort();
- }
-
- @Override
- public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) {
- return true;
- }
-
- @Override
- public boolean commit(Transaction tx) {
- return true;
- }
-
- @Override
- public void abort(Transaction tx) {
- // do nothing
- }
-
- @Override
- public boolean invalidate(long tx) {
- return true;
- }
-
- @Override
- public Transaction checkpoint(Transaction tx) {
- long newWritePointer = getWritePointer();
- LongArrayList newCheckpointPointers = new LongArrayList(tx.getCheckpointWritePointers());
- newCheckpointPointers.add(newWritePointer);
- return new Transaction(tx, newWritePointer, newCheckpointPointers.toLongArray());
- }
-
- @Override
- public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
- throw new TransactionCouldNotTakeSnapshotException(
- "Snapshot not implemented in detached transaction system client");
- }
-
- @Override
- public String status() {
- return TxConstants.STATUS_OK;
- }
-
- @Override
- public void resetState() {
- // do nothing
- }
-
- @Override
- public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
- return true;
- }
-
- @Override
- public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
- return true;
- }
-
- @Override
- public int getInvalidSize() {
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java
deleted file mode 100644
index feecdce..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.inmemory;
-
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TxConstants;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.discovery.DiscoveryService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-
-/**
- * Transaction server that manages transaction data for the Reactor.
- * <p>
- * Transaction server is HA, one can start multiple instances, only one of which is active and will register itself in
- * discovery service.
- * </p>
- */
-public class InMemoryTransactionService extends AbstractService {
- private static final Logger LOG = LoggerFactory.getLogger(InMemoryTransactionService.class);
-
- private final DiscoveryService discoveryService;
- private final String serviceName;
- protected final Provider<TransactionManager> txManagerProvider;
- private Cancellable cancelDiscovery;
- protected TransactionManager txManager;
-
- // thrift server config
- protected final String address;
- protected final int port;
- protected final int threads;
- protected final int ioThreads;
- protected final int maxReadBufferBytes;
-
- @Inject
- public InMemoryTransactionService(Configuration conf,
- DiscoveryService discoveryService,
- Provider<TransactionManager> txManagerProvider) {
-
- this.discoveryService = discoveryService;
- this.txManagerProvider = txManagerProvider;
- this.serviceName = conf.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME,
- TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME);
-
- address = conf.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS, TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS);
- port = conf.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT);
-
- // Retrieve the number of threads for the service
- threads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS,
- TxConstants.Service.DEFAULT_DATA_TX_SERVER_THREADS);
- ioThreads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS,
- TxConstants.Service.DEFAULT_DATA_TX_SERVER_IO_THREADS);
-
- maxReadBufferBytes = conf.getInt(TxConstants.Service.CFG_DATA_TX_THRIFT_MAX_READ_BUFFER,
- TxConstants.Service.DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER);
-
- LOG.info("Configuring TransactionService" +
- ", address: " + address +
- ", port: " + port +
- ", threads: " + threads +
- ", io threads: " + ioThreads +
- ", max read buffer (bytes): " + maxReadBufferBytes);
- }
-
- protected void undoRegister() {
- if (cancelDiscovery != null) {
- cancelDiscovery.cancel();
- }
- }
-
- protected void doRegister() {
- cancelDiscovery = discoveryService.register(new Discoverable() {
- @Override
- public String getName() {
- return serviceName;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return getAddress();
- }
- });
- }
-
- protected InetSocketAddress getAddress() {
- return new InetSocketAddress(1);
- }
-
- @Override
- protected void doStart() {
- try {
- txManager = txManagerProvider.get();
- txManager.startAndWait();
- doRegister();
- LOG.info("Transaction Thrift service started successfully on " + getAddress());
- notifyStarted();
- } catch (Throwable t) {
- LOG.info("Transaction Thrift service didn't start on " + getAddress());
- notifyFailed(t);
- }
- }
-
- @Override
- protected void doStop() {
- undoRegister();
- txManager.stopAndWait();
- notifyStopped();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java
deleted file mode 100644
index ba15269..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.inmemory;
-
-import co.cask.tephra.InvalidTruncateTimeException;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionNotInProgressException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import com.google.inject.Inject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Set;
-
-/**
- *
- */
-public class InMemoryTxSystemClient implements TransactionSystemClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(InMemoryTxSystemClient.class);
-
- TransactionManager txManager;
-
- @Inject
- public InMemoryTxSystemClient(TransactionManager txmgr) {
- txManager = txmgr;
- }
-
- @Override
- public Transaction startLong() {
- return txManager.startLong();
- }
-
- @Override
- public Transaction startShort() {
- return txManager.startShort();
- }
-
- @Override
- public Transaction startShort(int timeout) {
- return txManager.startShort(timeout);
- }
-
- @Override
- public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException {
- return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
- }
-
- @Override
- public boolean commit(Transaction tx) throws TransactionNotInProgressException {
- return txManager.commit(tx);
- }
-
- @Override
- public void abort(Transaction tx) {
- txManager.abort(tx);
- }
-
- @Override
- public boolean invalidate(long tx) {
- return txManager.invalidate(tx);
- }
-
- @Override
- public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException {
- return txManager.checkpoint(tx);
- }
-
- @Override
- public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- try {
- boolean snapshotTaken = txManager.takeSnapshot(out);
- if (!snapshotTaken) {
- throw new TransactionCouldNotTakeSnapshotException("Transaction manager did not take a snapshot.");
- }
- } finally {
- out.close();
- }
- return new ByteArrayInputStream(out.toByteArray());
- } catch (IOException e) {
- LOG.error("Snapshot could not be taken", e);
- throw new TransactionCouldNotTakeSnapshotException(e.getMessage());
- }
- }
-
- @Override
- public String status() {
- return txManager.isRunning() ? TxConstants.STATUS_OK : TxConstants.STATUS_NOTOK;
- }
-
- @Override
- public void resetState() {
- txManager.resetState();
- }
-
- @Override
- public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
- return txManager.truncateInvalidTx(invalidTxIds);
- }
-
- @Override
- public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
- return txManager.truncateInvalidTxBefore(time);
- }
-
- @Override
- public int getInvalidSize() {
- return txManager.getInvalidSize();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java
deleted file mode 100644
index 6f83565..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.inmemory;
-
-import co.cask.tephra.InvalidTruncateTimeException;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.TransactionNotInProgressException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * Dummy implementation of TxSystemClient. May be useful for perf testing.
- */
-public class MinimalTxSystemClient implements TransactionSystemClient {
- private long currentTxPointer = 1;
-
- @Override
- public Transaction startShort() {
- long wp = currentTxPointer++;
- // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets
- return new Transaction(
- Long.MAX_VALUE - 1, wp, new long[0], new long[0],
- Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT);
- }
-
- @Override
- public Transaction startShort(int timeout) {
- return startShort();
- }
-
- @Override
- public Transaction startLong() {
- return startShort();
- }
-
- @Override
- public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) {
- return true;
- }
-
- @Override
- public boolean commit(Transaction tx) {
- return true;
- }
-
- @Override
- public void abort(Transaction tx) {
- // do nothing
- }
-
- @Override
- public boolean invalidate(long tx) {
- return true;
- }
-
- @Override
- public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException {
- return tx;
- }
-
- @Override
- public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
- throw new TransactionCouldNotTakeSnapshotException("Not snapshot to take.");
- }
-
- @Override
- public String status() {
- return TxConstants.STATUS_OK;
- }
-
- @Override
- public void resetState() {
- // do nothing
- }
-
- @Override
- public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
- return true;
- }
-
- @Override
- public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException {
- return true;
- }
-
- @Override
- public int getInvalidSize() {
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java
deleted file mode 100644
index b7a7c59..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains in memory implementation of the transaction system v2.
- */
-package co.cask.tephra.inmemory;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
deleted file mode 100644
index 8e33b4d..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.metrics;
-
-import co.cask.tephra.TxConstants;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.Slf4jReporter;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Default metrics collector implementation using <a href="http://metrics.dropwizard.io">Yammer Metrics</a>.
- *
- * <p>The reporting frequency for this collector can be configured by setting the
- * {@code data.tx.metrics.period} configuration property to the reporting frequency in seconds.
- * </p>
- */
-public class DefaultMetricsCollector extends TxMetricsCollector {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricsCollector.class);
-
- private final MetricRegistry metrics = new MetricRegistry();
- private JmxReporter jmxReporter;
- private ScheduledReporter reporter;
- private int reportPeriod;
- private ConcurrentMap<String, AtomicLong> gauges = Maps.newConcurrentMap();
-
- @Override
- public void configure(Configuration conf) {
- // initialize selected output reporter
- reportPeriod = conf.getInt(TxConstants.Metrics.REPORT_PERIOD_KEY, TxConstants.Metrics.REPORT_PERIOD_DEFAULT);
- LOG.info("Configured metrics report to emit every {} seconds", reportPeriod);
- // TODO: reporters should be pluggable based on injection
- jmxReporter = JmxReporter.forRegistry(metrics).build();
- reporter = Slf4jReporter.forRegistry(metrics)
- .outputTo(LoggerFactory.getLogger("tephra-metrics"))
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .build();
- }
-
-
- @Override
- public void gauge(String metricName, int value, String... tags) {
- AtomicLong gauge = gauges.get(metricName);
- if (gauge == null) {
- final AtomicLong newValue = new AtomicLong();
- if (gauges.putIfAbsent(metricName, newValue) == null) {
- // first to set the value, need to register the metric
- metrics.register(metricName, new Gauge<Long>() {
- @Override
- public Long getValue() {
- return newValue.get();
- }
- });
- gauge = newValue;
- } else {
- // someone else set it first
- gauge = gauges.get(metricName);
- }
- }
- gauge.set(value);
- }
-
- @Override
- public void histogram(String metricName, int value) {
- metrics.histogram(metricName).update(value);
- }
-
- @Override
- public void rate(String metricName) {
- metrics.meter(metricName).mark();
- }
-
- @Override
- public void rate(String metricName, int count) {
- metrics.meter(metricName).mark(count);
- }
-
- @Override
- protected void startUp() throws Exception {
- jmxReporter.start();
- reporter.start(reportPeriod, TimeUnit.SECONDS);
- LOG.info("Started metrics reporter");
- }
-
- @Override
- protected void shutDown() throws Exception {
- jmxReporter.stop();
- reporter.stop();
- LOG.info("Stopped metrics reporter");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
deleted file mode 100644
index 3aae4e0..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.metrics;
-
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Basic API for Tephra to support system metrics.
- */
-public interface MetricsCollector extends Service {
- /**
- * Report a metric as an absolute value.
- */
- void gauge(String metricName, int value, String... tags);
-
- /**
- * Report a metric as a count over a given time duration. This method uses an implicit count of 1.
- */
- void rate(String metricName);
-
- /**
- * Report a metric as a count over a given time duration.
- */
- void rate(String metricName, int count);
-
- /**
- * Report a metric calculating the distribution of the value.
- */
- void histogram(String metricName, int value);
-
- /**
- * Called before the collector service is started, allowing the collector to setup any
- * required configuration.
- */
- void configure(Configuration conf);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
deleted file mode 100644
index 7fba8e5..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.metrics;
-
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Metrics Collector Class, to emit Transaction Related Metrics.
- * Note: This default implementation is a no-op and doesn't emit any metrics
- */
-public class TxMetricsCollector extends AbstractIdleService implements MetricsCollector {
-
- @Override
- public void gauge(String metricName, int value, String... tags) {
- //no-op
- }
-
- @Override
- public void rate(String metricName) {
- // no-op
- }
-
- @Override
- public void rate(String metricName, int count) {
- // no-op
- }
-
- @Override
- public void histogram(String metricName, int value) {
- // no-op
- }
-
- @Override
- public void configure(Configuration conf) {
- // no-op
- }
-
- /* Service methods */
-
- @Override
- protected void startUp() throws Exception {
- // no-op
- }
-
- @Override
- protected void shutDown() throws Exception {
- // no-op
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/package-info.java b/tephra-core/src/main/java/co/cask/tephra/package-info.java
deleted file mode 100644
index 6f2f858..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains implementations of the transaction system v2.
- */
-package co.cask.tephra;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
deleted file mode 100644
index 173cc9f..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.metrics.MetricsCollector;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Common implementation of a transaction log, backed by file reader and writer based storage. Classes extending
- * this class, must also implement {@link TransactionLogWriter} and {@link TransactionLogReader}.
- */
-public abstract class AbstractTransactionLog implements TransactionLog {
- /** Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". */
- private static final long SLOW_APPEND_THRESHOLD = 1000L;
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
-
- private final AtomicLong logSequence = new AtomicLong();
- private final MetricsCollector metricsCollector;
- protected long timestamp;
- private volatile boolean initialized;
- private volatile boolean closed;
- private AtomicLong syncedUpTo = new AtomicLong();
- private List<Entry> pendingWrites = Lists.newLinkedList();
- private TransactionLogWriter writer;
-
- public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
- this.timestamp = timestamp;
- this.metricsCollector = metricsCollector;
- }
-
- /**
- * Initializes the log file, opening a file writer. Clients calling {@code init()} should ensure that they
- * also call {@link HDFSTransactionLog#close()}.
- * @throws java.io.IOException If an error is encountered initializing the file writer.
- */
- public synchronized void init() throws IOException {
- if (initialized) {
- return;
- }
- this.writer = createWriter();
- this.initialized = true;
- }
-
- /**
- * Returns a log writer to be used for appending any new {@link TransactionEdit} objects.
- */
- protected abstract TransactionLogWriter createWriter() throws IOException;
-
- @Override
- public abstract String getName();
-
- @Override
- public long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public void append(TransactionEdit edit) throws IOException {
- long startTime = System.nanoTime();
- synchronized (this) {
- ensureAvailable();
-
- Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
-
- // add to pending edits
- append(entry);
- }
-
- // wait for sync to complete
- sync();
- long durationMillis = (System.nanoTime() - startTime) / 1000000L;
- if (durationMillis > SLOW_APPEND_THRESHOLD) {
- LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
- }
- }
-
- @Override
- public void append(List<TransactionEdit> edits) throws IOException {
- long startTime = System.nanoTime();
- synchronized (this) {
- ensureAvailable();
-
- for (TransactionEdit edit : edits) {
- Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
-
- // add to pending edits
- append(entry);
- }
- }
-
- // wait for sync to complete
- sync();
- long durationMillis = (System.nanoTime() - startTime) / 1000000L;
- if (durationMillis > SLOW_APPEND_THRESHOLD) {
- LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
- }
- }
-
- private void ensureAvailable() throws IOException {
- if (closed) {
- throw new IOException("Log " + getName() + " is already closed, cannot append!");
- }
- if (!initialized) {
- init();
- }
- }
-
- /*
- * Appends new writes to the pendingWrites. It is better to keep it in
- * our own queue rather than writing it to the HDFS output stream because
- * HDFSOutputStream.writeChunk is not lightweight at all.
- */
- private void append(Entry e) throws IOException {
- pendingWrites.add(e);
- }
-
- // Returns all currently pending writes. New writes
- // will accumulate in a new list.
- private List<Entry> getPendingWrites() {
- synchronized (this) {
- List<Entry> save = this.pendingWrites;
- this.pendingWrites = new LinkedList<>();
- return save;
- }
- }
-
- private void sync() throws IOException {
- // writes out pending entries to the HLog
- TransactionLogWriter tmpWriter = null;
- long latestSeq = 0;
- int entryCount = 0;
- synchronized (this) {
- if (closed) {
- return;
- }
- // prevent writer being dereferenced
- tmpWriter = writer;
-
- List<Entry> currentPending = getPendingWrites();
- if (!currentPending.isEmpty()) {
- tmpWriter.commitMarker(currentPending.size());
- }
-
- // write out all accumulated entries to log.
- for (Entry e : currentPending) {
- tmpWriter.append(e);
- entryCount++;
- latestSeq = Math.max(latestSeq, e.getKey().get());
- }
- }
-
- long lastSynced = syncedUpTo.get();
- // someone else might have already synced our edits, avoid double syncing
- if (lastSynced < latestSeq) {
- tmpWriter.sync();
- metricsCollector.histogram("wal.sync.size", entryCount);
- syncedUpTo.compareAndSet(lastSynced, latestSeq);
- }
- }
-
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
- // perform a final sync if any outstanding writes
- if (!pendingWrites.isEmpty()) {
- sync();
- }
- // NOTE: writer is lazy-inited, so it can be null
- if (writer != null) {
- this.writer.close();
- }
- this.closed = true;
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public abstract TransactionLogReader getReader() throws IOException;
-
- /**
- * Represents an entry in the transaction log. Each entry consists of a key, generated from an incrementing sequence
- * number, and a value, the {@link TransactionEdit} being stored.
- */
- public static class Entry implements Writable {
- private LongWritable key;
- private TransactionEdit edit;
-
- // for Writable
- public Entry() {
- this.key = new LongWritable();
- this.edit = new TransactionEdit();
- }
-
- public Entry(LongWritable key, TransactionEdit edit) {
- this.key = key;
- this.edit = edit;
- }
-
- public LongWritable getKey() {
- return this.key;
- }
-
- public TransactionEdit getEdit() {
- return this.edit;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- this.key.write(out);
- this.edit.write(out);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.key.readFields(in);
- this.edit.readFields(in);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java
deleted file mode 100644
index 682435e..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Common base class for all transaction storage implementations. This implement logic to prefix a snapshot
- * with a version when encoding, and to select the correct codec for decoding based on this version prefix.
- */
-public abstract class AbstractTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
-
- protected final SnapshotCodecProvider codecProvider;
-
- protected AbstractTransactionStateStorage(SnapshotCodecProvider codecProvider) {
- this.codecProvider = codecProvider;
- }
-
- @Override
- public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
- codecProvider.encode(out, snapshot);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java b/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java
deleted file mode 100644
index c1796bd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.Ints;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.SequenceFile;
-
-import java.io.Closeable;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * Class to read and write commit markers used in {@link HDFSTransactionLogReaderV2} and above.
- */
-public class CommitMarkerCodec implements Closeable {
- private static final byte[] KEY_BYTES = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED.getBytes(Charsets.UTF_8);
- private final DataOutputBuffer rawKey;
- private final DataOutputBuffer rawValue;
- private SequenceFile.ValueBytes valueBytes;
-
- public CommitMarkerCodec() {
- this.rawKey = new DataOutputBuffer();
- this.rawValue = new DataOutputBuffer();
- }
-
- @Override
- public void close() throws IOException {
- rawKey.close();
- rawValue.close();
- }
-
- // 1. Returns the count when the marker is written correctly
- // 2. If data is incorrect (for ex, incorrect key, mismatch in key/value/record length), we throw IOException
- // since this indicates corrupted log file
- // 3. If data is incomplete, then we throw EOFException which is handled gracefully by the calling method
- // since we can recover without any consequence
- public int readMarker(SequenceFile.Reader reader) throws IOException {
- if (valueBytes == null) {
- valueBytes = reader.createValueBytes();
- }
- rawKey.reset();
- rawValue.reset();
-
- // valueBytes need not be reset since nextRaw call does it (and it is a private method)
- int status = reader.nextRaw(rawKey, valueBytes);
-
- // if we reach EOF, return -1
- if (status == -1) {
- return -1;
- }
-
- // Check if the marker key is valid and return the count
- if (isMarkerValid()) {
- valueBytes.writeUncompressedBytes(rawValue);
- rawValue.flush();
- // rawValue.getData() may return a larger byte array but Ints.fromByteArray will only read the first four bytes
- return Ints.fromByteArray(rawValue.getData());
- }
-
- // EOF not reached and marker is not valid, then thrown an IOException since we can't make progress
- throw new IOException(String.format("Invalid key for num entries appended found %s, expected : %s",
- new String(rawKey.getData()), TxConstants.TransactionLog.NUM_ENTRIES_APPENDED));
- }
-
- private boolean isMarkerValid() {
- // rawKey should have the expected length and the matching bytes should start at index 0
- return rawKey.getLength() == KEY_BYTES.length && Bytes.indexOf(rawKey.getData(), KEY_BYTES) == 0;
- }
-
- public static void writeMarker(SequenceFile.Writer writer, int count) throws IOException {
- writer.appendRaw(KEY_BYTES, 0, KEY_BYTES.length, new CommitEntriesCount(count));
- }
-
- @VisibleForTesting
- static final class CommitEntriesCount implements SequenceFile.ValueBytes {
- private final int numEntries;
-
- public CommitEntriesCount(int numEntries) {
- this.numEntries = numEntries;
- }
-
- @Override
- public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
- outStream.write(Ints.toByteArray(numEntries));
- }
-
- @Override
- public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
- throw new IllegalArgumentException("Commit Entries count writing is not expected to be compressed.");
- }
-
- @Override
- public int getSize() {
- return Ints.BYTES;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
deleted file mode 100644
index bed90c2..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.MetricsCollector;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-/**
- * Allows reading from and writing to a transaction write-ahead log stored in HDFS.
- */
-public class HDFSTransactionLog extends AbstractTransactionLog {
- private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLog.class);
-
- private final FileSystem fs;
- private final Configuration hConf;
- private final Path logPath;
-
- /**
- * Creates a new HDFS-backed write-ahead log for storing transaction state.
- * @param fs Open FileSystem instance for opening log files in HDFS.
- * @param hConf HDFS cluster configuration.
- * @param logPath Path to the log file.
- */
- public HDFSTransactionLog(final FileSystem fs, final Configuration hConf,
- final Path logPath, long timestamp, MetricsCollector metricsCollector) {
- super(timestamp, metricsCollector);
- this.fs = fs;
- this.hConf = hConf;
- this.logPath = logPath;
- }
-
- @Override
- protected TransactionLogWriter createWriter() throws IOException {
- return new LogWriter(fs, hConf, logPath);
- }
-
- @Override
- public String getName() {
- return logPath.getName();
- }
-
- @Override
- public TransactionLogReader getReader() throws IOException {
- FileStatus status = fs.getFileStatus(logPath);
- long length = status.getLen();
-
- TransactionLogReader reader = null;
- // check if this file needs to be recovered due to failure
- // Check for possibly empty file. With appends, currently Hadoop reports a
- // zero length even if the file has been sync'd. Revisit if HDFS-376 or
- // HDFS-878 is committed.
- if (length <= 0) {
- LOG.warn("File " + logPath + " might be still open, length is 0");
- }
-
- try {
- HDFSUtil hdfsUtil = new HDFSUtil();
- hdfsUtil.recoverFileLease(fs, logPath, hConf);
- try {
- FileStatus newStatus = fs.getFileStatus(logPath);
- LOG.info("New file size for " + logPath + " is " + newStatus.getLen());
- SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf);
- reader = new HDFSTransactionLogReaderSupplier(fileReader).get();
- } catch (EOFException e) {
- if (length <= 0) {
- // TODO should we ignore an empty, not-last log file if skip.errors
- // is false? Either way, the caller should decide what to do. E.g.
- // ignore if this is the last log in sequence.
- // TODO is this scenario still possible if the log has been
- // recovered (i.e. closed)
- LOG.warn("Could not open " + logPath + " for reading. File is empty", e);
- return null;
- } else {
- // EOFException being ignored
- return null;
- }
- }
- } catch (IOException e) {
- throw e;
- }
- return reader;
- }
-
- @VisibleForTesting
- static final class LogWriter implements TransactionLogWriter {
- private final SequenceFile.Writer internalWriter;
- public LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException {
- // TODO: retry a few times to ride over transient failures?
- SequenceFile.Metadata metadata = new SequenceFile.Metadata();
- metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
- new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
-
- this.internalWriter = SequenceFile.createWriter(fs, hConf, logPath, LongWritable.class, TransactionEdit.class,
- SequenceFile.CompressionType.NONE, null, null, metadata);
- LOG.debug("Created a new TransactionLog writer for " + logPath);
- }
-
- @Override
- public void append(Entry entry) throws IOException {
- internalWriter.append(entry.getKey(), entry.getEdit());
- }
-
- @Override
- public void commitMarker(int count) throws IOException {
- CommitMarkerCodec.writeMarker(internalWriter, count);
- }
-
- @Override
- public void sync() throws IOException {
- internalWriter.syncFs();
- }
-
- @Override
- public void close() throws IOException {
- internalWriter.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java
deleted file mode 100644
index c407945..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import com.google.common.base.Supplier;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-
-/**
- * Provides the correct version of {@link TransactionLogReader}, based on the log's version metadata,
- * to read HDFS Transaction Logs.
- */
-public class HDFSTransactionLogReaderSupplier implements Supplier<TransactionLogReader> {
- private final SequenceFile.Reader reader;
- private final byte version;
- private TransactionLogReader logReader;
-
- public HDFSTransactionLogReaderSupplier(SequenceFile.Reader reader) {
- this.reader = reader;
- Text versionInfo = reader.getMetadata().get(new Text(TxConstants.TransactionLog.VERSION_KEY));
- this.version = versionInfo == null ? 1 : Byte.parseByte(versionInfo.toString());
- }
-
- @Override
- public TransactionLogReader get() {
- if (logReader != null) {
- return logReader;
- }
-
- switch (version) {
- case 2:
- logReader = new HDFSTransactionLogReaderV2(reader);
- return logReader;
- case 1:
- logReader = new HDFSTransactionLogReaderV1(reader);
- return logReader;
- default:
- throw new IllegalArgumentException(String.format("Invalid version %s found in the Transaction Log", version));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java
deleted file mode 100644
index cb2ce7c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-/**
- * {@link TransactionLogReader} that can read v1 (default) version of Transaction logs. The logs are expected to
- * have a sequence of {@link TransactionEdit}s.
- */
-public class HDFSTransactionLogReaderV1 implements TransactionLogReader {
- private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV1.class);
- private final SequenceFile.Reader reader;
- private final LongWritable key;
- private boolean closed;
-
- public HDFSTransactionLogReaderV1(SequenceFile.Reader reader) {
- this.reader = reader;
- this.key = new LongWritable();
- }
-
- @Override
- public TransactionEdit next() throws IOException {
- return next(new TransactionEdit());
- }
-
- @Override
- public TransactionEdit next(TransactionEdit reuse) throws IOException {
- if (closed) {
- return null;
- }
-
- try {
- boolean successful = reader.next(key, reuse);
- return successful ? reuse : null;
- } catch (EOFException e) {
- LOG.warn("Hit an unexpected EOF while trying to read the Transaction Edit. Skipping the entry.", e);
- return null;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (closed) {
- return;
- }
- reader.close();
- closed = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java
deleted file mode 100644
index 6981a3b..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-/**
- * {@link TransactionLogReader} that can read v2 version of Transaction logs. The logs are expected to
- * have commit markers that indicates the size of the batch of {@link TransactionEdit}s (follows the marker),
- * that were synced together. If the expected number of {@link TransactionEdit}s are not present then that set of
- * {@link TransactionEdit}s are discarded.
- */
-public class HDFSTransactionLogReaderV2 implements TransactionLogReader {
- private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class);
-
- private final SequenceFile.Reader reader;
- private final Queue<TransactionEdit> transactionEdits;
- private final CommitMarkerCodec commitMarkerCodec;
- private final LongWritable key;
-
- private boolean closed;
-
- public HDFSTransactionLogReaderV2(SequenceFile.Reader reader) {
- this.reader = reader;
- this.transactionEdits = new ArrayDeque<>();
- this.key = new LongWritable();
- this.commitMarkerCodec = new CommitMarkerCodec();
- }
-
- @Override
- public void close() throws IOException {
- if (closed) {
- return;
- }
- try {
- commitMarkerCodec.close();
- } finally {
- reader.close();
- closed = true;
- }
- }
-
- @Override
- public TransactionEdit next() throws IOException {
- return next(null);
- }
-
- @Override
- public TransactionEdit next(TransactionEdit reuse) throws IOException {
- if (closed) {
- return null;
- }
-
- if (!transactionEdits.isEmpty()) {
- return transactionEdits.remove();
- }
-
- // Fetch the 'marker' and read 'marker' number of edits
- populateTransactionEdits();
- return transactionEdits.poll();
- }
-
- private void populateTransactionEdits() throws IOException {
- // read the marker to determine numEntries to read.
- int numEntries = 0;
- try {
- // can throw EOFException if reading of incomplete commit marker, no other action required since we can safely
- // ignore this
- numEntries = commitMarkerCodec.readMarker(reader);
- } catch (EOFException e) {
- LOG.warn("Reached EOF in log while trying to read commit marker", e);
- }
-
- for (int i = 0; i < numEntries; i++) {
- TransactionEdit edit = new TransactionEdit();
- try {
- if (reader.next(key, edit)) {
- transactionEdits.add(edit);
- } else {
- throw new EOFException("Attempt to read TransactionEdit failed.");
- }
- } catch (EOFException e) {
- // we have reached EOF before reading back numEntries, we clear the partial list and return.
- LOG.warn("Reached EOF in log before reading {} entries. Ignoring all {} edits since the last marker",
- numEntries, transactionEdits.size(), e);
- transactionEdits.clear();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
deleted file mode 100644
index bc7fb9f..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.persist;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import co.cask.tephra.util.ConfigurationFactory;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.io.CountingInputStream;
-import com.google.common.primitives.Longs;
-import com.google.inject.Inject;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import javax.annotation.Nullable;
-
-/**
- * Handles persistence of transaction snapshot and logs to a directory in HDFS.
- *
- * The directory used for file storage is configured using the {@code data.tx.snapshot.dir} configuration property.
- * Both snapshot and transaction log files are suffixed with a timestamp to allow easy ordering. Snapshot files
- * are written with the filename "snapshot.<timestamp>". Transaction log files are written with the filename
- * "txlog.<timestamp>".
- */
-public class HDFSTransactionStateStorage extends AbstractTransactionStateStorage {
- private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionStateStorage.class);
-
- private static final String SNAPSHOT_FILE_PREFIX = "snapshot.";
- private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.snapshot.";
- private static final String LOG_FILE_PREFIX = "txlog.";
-
- private static final PathFilter SNAPSHOT_FILE_FILTER = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().startsWith(SNAPSHOT_FILE_PREFIX);
- }
- };
-
- // buffer size used for HDFS reads and writes
- private static final int BUFFER_SIZE = 16384;
-
- private final Configuration hConf;
- private final String configuredSnapshotDir;
- private final MetricsCollector metricsCollector;
- private FileSystem fs;
- private Path snapshotDir;
-
- @Inject
- public HDFSTransactionStateStorage(Configuration hConf, SnapshotCodecProvider codecProvider,
- MetricsCollector metricsCollector) {
- super(codecProvider);
- this.hConf = hConf;
- this.configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
- this.metricsCollector = metricsCollector;
- }
-
- @Override
- protected void startUp() throws Exception {
- Preconditions.checkState(configuredSnapshotDir != null,
- "Snapshot directory is not configured. Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_DIR +
- " in configuration.");
- String hdfsUser = hConf.get(TxConstants.Manager.CFG_TX_HDFS_USER);
- if (hdfsUser == null || UserGroupInformation.isSecurityEnabled()) {
- if (hdfsUser != null && LOG.isDebugEnabled()) {
- LOG.debug("Ignoring configuration {}={}, running on secure Hadoop",
- TxConstants.Manager.CFG_TX_HDFS_USER, hdfsUser);
- }
- // NOTE: we can start multiple times this storage. As hdfs uses per-jvm cache, we want to create new fs instead
- // of getting closed one
- fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf);
- } else {
- fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf, hdfsUser);
- }
- snapshotDir = new Path(configuredSnapshotDir);
- LOG.info("Using snapshot dir " + snapshotDir);
- }
-
- @Override
- protected void shutDown() throws Exception {
- fs.close();
- }
-
- @Override
- public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
- // create a temporary file, and save the snapshot
- Path snapshotTmpFile = new Path(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
- LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile);
-
- FSDataOutputStream out = fs.create(snapshotTmpFile, false, BUFFER_SIZE);
- // encode the snapshot and stream the serialized version to the file
- try {
- codecProvider.encode(out, snapshot);
- } finally {
- out.close();
- }
-
- // move the temporary file into place with the correct filename
- Path finalFile = new Path(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp());
- fs.rename(snapshotTmpFile, finalFile);
- LOG.debug("Completed snapshot to file {}", finalFile);
- }
-
- @Override
- public TransactionSnapshot getLatestSnapshot() throws IOException {
- InputStream in = getLatestSnapshotInputStream();
- if (in == null) {
- return null;
- }
- try {
- return readSnapshotInputStream(in);
- } finally {
- in.close();
- }
- }
-
- @Override
- public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
- InputStream in = getLatestSnapshotInputStream();
- if (in == null) {
- return null;
- }
- try {
- return readTransactionVisibilityStateFromInputStream(in);
- } finally {
- in.close();
- }
- }
-
- private InputStream getLatestSnapshotInputStream() throws IOException {
- TimestampedFilename[] snapshots = listSnapshotFiles();
- Arrays.sort(snapshots);
- if (snapshots.length > 0) {
- // last is the most recent
- return fs.open(snapshots[snapshots.length - 1].getPath(), BUFFER_SIZE);
- }
-
- LOG.info("No snapshot files found in {}", snapshotDir);
- return null;
- }
-
- private TransactionSnapshot readSnapshotInputStream(InputStream in) throws IOException {
- CountingInputStream countingIn = new CountingInputStream(in);
- TransactionSnapshot snapshot = codecProvider.decode(countingIn);
- LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount());
- return snapshot;
- }
-
- private TransactionVisibilityState readTransactionVisibilityStateFromInputStream(InputStream in) throws IOException {
- CountingInputStream countingIn = new CountingInputStream(in);
- TransactionVisibilityState state = codecProvider.decodeTransactionVisibilityState(countingIn);
- LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount());
- return state;
- }
-
- private TransactionSnapshot readSnapshotFile(Path filePath) throws IOException {
- FSDataInputStream in = fs.open(filePath, BUFFER_SIZE);
- try {
- return readSnapshotInputStream(in);
- } finally {
- in.close();
- }
- }
-
- private TimestampedFilename[] listSnapshotFiles() throws IOException {
- FileStatus[] snapshotFileStatuses = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER);
- TimestampedFilename[] snapshotFiles = new TimestampedFilename[snapshotFileStatuses.length];
- for (int i = 0; i < snapshotFileStatuses.length; i++) {
- snapshotFiles[i] = new TimestampedFilename(snapshotFileStatuses[i].getPath());
- }
- return snapshotFiles;
- }
-
- @Override
- public long deleteOldSnapshots(int numberToKeep) throws IOException {
- TimestampedFilename[] snapshots = listSnapshotFiles();
- if (snapshots.length == 0) {
- return -1;
- }
- Arrays.sort(snapshots, Collections.reverseOrder());
- if (snapshots.length <= numberToKeep) {
- // nothing to remove, oldest timestamp is the last snapshot
- return snapshots[snapshots.length - 1].getTimestamp();
- }
- int toRemoveCount = snapshots.length - numberToKeep;
- TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount];
- System.arraycopy(snapshots, numberToKeep, toRemove, 0, toRemoveCount);
-
- for (TimestampedFilename f : toRemove) {
- LOG.debug("Removing old snapshot file {}", f.getPath());
- fs.delete(f.getPath(), false);
- }
- long oldestTimestamp = snapshots[numberToKeep - 1].getTimestamp();
- LOG.debug("Removed {} old snapshot files prior to {}", toRemoveCount, oldestTimestamp);
- return oldestTimestamp;
- }
-
- @Override
- public List<String> listSnapshots() throws IOException {
- FileStatus[] files = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER);
- return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() {
- @Nullable
- @Override
- public String apply(@Nullable FileStatus input) {
- return input.getPath().getName();
- }
- });
- }
-
- @Override
- public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
- FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(timestamp, Long.MAX_VALUE));
- TimestampedFilename[] timestampedFiles = new TimestampedFilename[statuses.length];
- for (int i = 0; i < statuses.length; i++) {
- timestampedFiles[i] = new TimestampedFilename(statuses[i].getPath());
- }
- return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() {
- @Nullable
- @Override
- public TransactionLog apply(@Nullable TimestampedFilename input) {
- return openLog(input.getPath(), input.getTimestamp());
- }
- });
- }
-
- @Override
- public TransactionLog createLog(long timestamp) throws IOException {
- Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timestamp);
- return openLog(newLog, timestamp);
- }
-
- private TransactionLog openLog(Path path, long timestamp) {
- return new HDFSTransactionLog(fs, hConf, path, timestamp, metricsCollector);
- }
-
- @Override
- public void deleteLogsOlderThan(long timestamp) throws IOException {
- FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(0, timestamp));
- int removedCnt = 0;
- for (FileStatus status : statuses) {
- LOG.debug("Removing old transaction log {}", status.getPath());
- if (fs.delete(status.getPath(), false)) {
- removedCnt++;
- } else {
- LOG.error("Failed to delete transaction log file {}", status.getPath());
- }
- }
- LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp);
- }
-
- @Override
- public void setupStorage() throws IOException {
- if (!fs.exists(snapshotDir)) {
- LOG.info("Creating snapshot dir at {}", snapshotDir);
- fs.mkdirs(snapshotDir);
- } else {
- Preconditions.checkState(fs.isDirectory(snapshotDir),
- "Configured snapshot directory " + snapshotDir + " is not a directory!");
- }
- }
-
- @Override
- public List<String> listLogs() throws IOException {
- FileStatus[] files = fs.listStatus(snapshotDir, new LogFileFilter(0, Long.MAX_VALUE));
- return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() {
- @Nullable
- @Override
- public String apply(@Nullable FileStatus input) {
- return input.getPath().getName();
- }
- });
- }
-
- @Override
- public String getLocation() {
- return snapshotDir.toString();
- }
-
- private static class LogFileFilter implements PathFilter {
- // starting time of files to include (inclusive)
- private final long startTime;
- // ending time of files to include (exclusive)
- private final long endTime;
-
- public LogFileFilter(long startTime, long endTime) {
- this.startTime = startTime;
- this.endTime = endTime;
- }
-
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith(LOG_FILE_PREFIX)) {
- String[] parts = path.getName().split("\\.");
- if (parts.length == 2) {
- try {
- long fileTime = Long.parseLong(parts[1]);
- return fileTime >= startTime && fileTime < endTime;
- } catch (NumberFormatException ignored) {
- LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", path.getName());
- }
- }
- }
- return false;
- }
- }
-
- /**
- * Represents a filename composed of a prefix and a ".timestamp" suffix. This is useful for manipulating both
- * snapshot and transaction log filenames.
- */
- private static class TimestampedFilename implements Comparable<TimestampedFilename> {
- private Path path;
- private String prefix;
- private long timestamp;
-
- public TimestampedFilename(Path path) {
- this.path = path;
- String[] parts = path.getName().split("\\.");
- if (parts.length != 2) {
- throw new IllegalArgumentException("Filename " + path.getName() +
- " did not match the expected pattern prefix.timestamp");
- }
- prefix = parts[0];
- timestamp = Long.parseLong(parts[1]);
- }
-
- public Path getPath() {
- return path;
- }
-
- public String getPrefix() {
- return prefix;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public int compareTo(TimestampedFilename other) {
- int res = prefix.compareTo(other.getPrefix());
- if (res == 0) {
- res = Longs.compare(timestamp, other.getTimestamp());
- }
- return res;
- }
- }
-
- // TODO move this out as a separate command line tool
- private enum CLIMode { SNAPSHOT, TXLOG };
- /**
- * Reads a transaction state snapshot or transaction log from HDFS and prints the entries to stdout.
- *
- * Supports the following options:
- * -s read snapshot state (defaults to the latest)
- * -l read a transaction log
- * [filename] reads the given file
- * @param args
- */
- public static void main(String[] args) {
- List<String> filenames = Lists.newArrayList();
- CLIMode mode = null;
- for (String arg : args) {
- if ("-s".equals(arg)) {
- mode = CLIMode.SNAPSHOT;
- } else if ("-l".equals(arg)) {
- mode = CLIMode.TXLOG;
- } else if ("-h".equals(arg)) {
- printUsage(null);
- } else {
- filenames.add(arg);
- }
- }
-
- if (mode == null) {
- printUsage("ERROR: Either -s or -l is required to set mode.", 1);
- }
-
- Configuration config = new ConfigurationFactory().get();
-
- // Use the no-op metrics collector. We are being run as a command line tool, so there are no relevant metrics
- // to report
- HDFSTransactionStateStorage storage =
- new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector());
- storage.startAndWait();
- try {
- switch (mode) {
- case SNAPSHOT:
- try {
- if (filenames.isEmpty()) {
- TransactionSnapshot snapshot = storage.getLatestSnapshot();
- printSnapshot(snapshot);
- }
- for (String file : filenames) {
- Path path = new Path(file);
- TransactionSnapshot snapshot = storage.readSnapshotFile(path);
- printSnapshot(snapshot);
- System.out.println();
- }
- } catch (IOException ioe) {
- System.err.println("Error reading snapshot files: " + ioe.getMessage());
- ioe.printStackTrace();
- System.exit(1);
- }
- break;
- case TXLOG:
- if (filenames.isEmpty()) {
- printUsage("ERROR: At least one transaction log filename is required!", 1);
- }
- for (String file : filenames) {
- TimestampedFilename timestampedFilename = new TimestampedFilename(new Path(file));
- TransactionLog log = storage.openLog(timestampedFilename.getPath(), timestampedFilename.getTimestamp());
- printLog(log);
- System.out.println();
- }
- break;
- }
- } finally {
- storage.stop();
- }
- }
-
- private static void printUsage(String message) {
- printUsage(message, 0);
- }
-
- private static void printUsage(String message, int exitCode) {
- if (message != null) {
- System.out.println(message);
- }
- System.out.println("Usage: java " + HDFSTransactionStateStorage.class.getName() + " (-s|-l) file1 [file2...]");
- System.out.println();
- System.out.println("\t-s\tRead files as transaction state snapshots (will default to latest if no file given)");
- System.out.println("\t-l\tRead files as transaction logs [filename is required]");
- System.out.println("\t-h\tPrint this message");
- System.exit(exitCode);
- }
-
- private static void printSnapshot(TransactionSnapshot snapshot) {
- Date snapshotDate = new Date(snapshot.getTimestamp());
- System.out.println("TransactionSnapshot at " + snapshotDate.toString());
- System.out.println("\t" + snapshot.toString());
- }
-
- private static void printLog(TransactionLog log) {
- try {
- System.out.println("TransactionLog " + log.getName());
- TransactionLogReader reader = log.getReader();
- TransactionEdit edit;
- long seq = 0;
- while ((edit = reader.next()) != null) {
- System.out.println(String.format(" %d: %s", seq++, edit.toString()));
- }
- } catch (IOException ioe) {
- System.err.println("ERROR reading log " + log.getName() + ": " + ioe.getMessage());
- ioe.printStackTrace();
- }
- }
-}