You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2016/10/12 20:12:20 UTC
[03/52] [abbrv] hadoop git commit: HDFS-10957. Retire BKJM from trunk
(Vinayakumar B)
HDFS-10957. Retire BKJM from trunk (Vinayakumar B)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/31195488
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/31195488
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/31195488
Branch: refs/heads/HADOOP-13037
Commit: 311954883f714973784432589896553eb320b597
Parents: 35b9d7d
Author: Vinayakumar B <vi...@apache.org>
Authored: Thu Oct 6 19:28:25 2016 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Thu Oct 6 19:28:25 2016 +0530
----------------------------------------------------------------------
.../src/contrib/bkjournal/README.txt | 66 --
.../dev-support/findbugsExcludeFile.xml | 5 -
.../hadoop-hdfs/src/contrib/bkjournal/pom.xml | 175 ----
.../bkjournal/BookKeeperEditLogInputStream.java | 264 -----
.../BookKeeperEditLogOutputStream.java | 188 ----
.../bkjournal/BookKeeperJournalManager.java | 893 -----------------
.../contrib/bkjournal/CurrentInprogress.java | 160 ---
.../bkjournal/EditLogLedgerMetadata.java | 217 ----
.../hadoop/contrib/bkjournal/MaxTxId.java | 103 --
.../bkjournal/src/main/proto/bkjournal.proto | 49 -
.../hadoop/contrib/bkjournal/BKJMUtil.java | 184 ----
.../bkjournal/TestBookKeeperAsHASharedDir.java | 414 --------
.../bkjournal/TestBookKeeperConfiguration.java | 174 ----
.../bkjournal/TestBookKeeperEditLogStreams.java | 92 --
.../bkjournal/TestBookKeeperHACheckpoints.java | 109 --
.../bkjournal/TestBookKeeperJournalManager.java | 984 -------------------
.../TestBookKeeperSpeculativeRead.java | 167 ----
.../bkjournal/TestBootstrapStandbyWithBKJM.java | 170 ----
.../bkjournal/TestCurrentInprogress.java | 160 ---
.../hdfs/server/namenode/FSEditLogTestUtil.java | 40 -
.../src/test/resources/log4j.properties | 55 --
.../markdown/HDFSHighAvailabilityWithNFS.md | 114 ---
hadoop-hdfs-project/pom.xml | 1 -
hadoop-project/pom.xml | 6 -
24 files changed, 4790 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt
deleted file mode 100644
index 7f67226..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt
+++ /dev/null
@@ -1,66 +0,0 @@
-This module provides a BookKeeper backend for HFDS Namenode write
-ahead logging.
-
-BookKeeper is a highly available distributed write ahead logging
-system. For more details, see
-
- http://zookeeper.apache.org/bookkeeper
-
--------------------------------------------------------------------------------
-How do I build?
-
- To generate the distribution packages for BK journal, do the
- following.
-
- $ mvn clean package -Pdist
-
- This will generate a jar with all the dependencies needed by the journal
- manager,
-
- target/hadoop-hdfs-bkjournal-<VERSION>.jar
-
- Note that the -Pdist part of the build command is important, as otherwise
- the dependencies would not be packaged in the jar.
-
--------------------------------------------------------------------------------
-How do I use the BookKeeper Journal?
-
- To run a HDFS namenode using BookKeeper as a backend, copy the bkjournal
- jar, generated above, into the lib directory of hdfs. In the standard
- distribution of HDFS, this is at $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/
-
- cp target/hadoop-hdfs-bkjournal-<VERSION>.jar \
- $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/
-
- Then, in hdfs-site.xml, set the following properties.
-
- <property>
- <name>dfs.namenode.edits.dir</name>
- <value>bookkeeper://localhost:2181/bkjournal,file:///path/for/edits</value>
- </property>
-
- <property>
- <name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
- <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
- </property>
-
- In this example, the namenode is configured to use 2 write ahead
- logging devices. One writes to BookKeeper and the other to a local
- file system. At the moment is is not possible to only write to
- BookKeeper, as the resource checker explicitly checked for local
- disks currently.
-
- The given example, configures the namenode to look for the journal
- metadata at the path /bkjournal on the a standalone zookeeper ensemble
- at localhost:2181. To configure a multiple host zookeeper ensemble,
- separate the hosts with semicolons. For example, if you have 3
- zookeeper servers, zk1, zk2 & zk3, each listening on port 2181, you
- would specify this with
-
- bookkeeper://zk1:2181;zk2:2181;zk3:2181/bkjournal
-
- The final part /bkjournal specifies the znode in zookeeper where
- ledger metadata will be store. Administrators can set this to anything
- they wish.
-
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml
deleted file mode 100644
index 45c3a75..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml
+++ /dev/null
@@ -1,5 +0,0 @@
-<FindBugsFilter>
- <Match>
- <Class name="~org.apache.hadoop.contrib.bkjournal.BKJournalProtos.*" />
- </Match>
-</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
deleted file mode 100644
index 7fb6c24..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
+++ /dev/null
@@ -1,175 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-project</artifactId>
- <version>3.0.0-alpha2-SNAPSHOT</version>
- <relativePath>../../../../../hadoop-project</relativePath>
- </parent>
-
- <groupId>org.apache.hadoop.contrib</groupId>
- <artifactId>hadoop-hdfs-bkjournal</artifactId>
- <version>3.0.0-alpha2-SNAPSHOT</version>
- <description>Apache Hadoop HDFS BookKeeper Journal</description>
- <name>Apache Hadoop HDFS BookKeeper Journal</name>
- <packaging>jar</packaging>
-
- <properties>
- <hadoop.component>hdfs</hadoop.component>
- <hadoop.common.build.dir>${basedir}/../../../../../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.bookkeeper</groupId>
- <artifactId>bookkeeper-server</artifactId>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-maven-plugins</artifactId>
- <executions>
- <execution>
- <id>compile-protoc</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>protoc</goal>
- </goals>
- <configuration>
- <protocVersion>${protobuf.version}</protocVersion>
- <protocCommand>${protoc.path}</protocCommand>
- <imports>
- <param>${basedir}/../../../../../hadoop-common-project/hadoop-common/src/main/proto</param>
- <param>${basedir}/../../../../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto</param>
- <param>${basedir}/../../../../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto</param>
- <param>${basedir}/src/main/proto</param>
- </imports>
- <source>
- <directory>${basedir}/src/main/proto</directory>
- <includes>
- <include>bkjournal.proto</include>
- </includes>
- </source>
- <output>${project.build.directory}/generated-sources/java</output>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- <configuration>
- <excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>dev-support/findbugsExcludeFile.xml</exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <profiles>
- <profile>
- <id>dist</id>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>dist</id>
- <phase>package</phase>
- <goals>
- <goal>copy</goal>
- </goals>
- <configuration>
- <artifactItems>
- <artifactItem>
- <groupId>org.apache.bookkeeper</groupId>
- <artifactId>bookkeeper-server</artifactId>
- <type>jar</type>
- </artifactItem>
- </artifactItems>
- <outputDirectory>${project.build.directory}/lib</outputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
deleted file mode 100644
index 86da807..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Enumeration;
-
-import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.BKException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Input stream which reads from a BookKeeper ledger.
- */
-class BookKeeperEditLogInputStream extends EditLogInputStream {
- static final Log LOG = LogFactory.getLog(BookKeeperEditLogInputStream.class);
-
- private final long firstTxId;
- private final long lastTxId;
- private final int logVersion;
- private final boolean inProgress;
- private final LedgerHandle lh;
-
- private final FSEditLogOp.Reader reader;
- private final FSEditLogLoader.PositionTrackingInputStream tracker;
-
- /**
- * Construct BookKeeper edit log input stream.
- * Starts reading from the first entry of the ledger.
- */
- BookKeeperEditLogInputStream(final LedgerHandle lh,
- final EditLogLedgerMetadata metadata)
- throws IOException {
- this(lh, metadata, 0);
- }
-
- /**
- * Construct BookKeeper edit log input stream.
- * Starts reading from firstBookKeeperEntry. This allows the stream
- * to take a shortcut during recovery, as it doesn't have to read
- * every edit log transaction to find out what the last one is.
- */
- BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata,
- long firstBookKeeperEntry)
- throws IOException {
- this.lh = lh;
- this.firstTxId = metadata.getFirstTxId();
- this.lastTxId = metadata.getLastTxId();
- this.logVersion = metadata.getDataLayoutVersion();
- this.inProgress = metadata.isInProgress();
-
- if (firstBookKeeperEntry < 0
- || firstBookKeeperEntry > lh.getLastAddConfirmed()) {
- throw new IOException("Invalid first bk entry to read: "
- + firstBookKeeperEntry + ", LAC: " + lh.getLastAddConfirmed());
- }
- BufferedInputStream bin = new BufferedInputStream(
- new LedgerInputStream(lh, firstBookKeeperEntry));
- tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
- DataInputStream in = new DataInputStream(tracker);
-
- reader = FSEditLogOp.Reader.create(in, tracker, logVersion);
- }
-
- @Override
- public long getFirstTxId() {
- return firstTxId;
- }
-
- @Override
- public long getLastTxId() {
- return lastTxId;
- }
-
- @Override
- public int getVersion(boolean verifyVersion) throws IOException {
- return logVersion;
- }
-
- @Override
- protected FSEditLogOp nextOp() throws IOException {
- return reader.readOp(false);
- }
-
- @Override
- public void close() throws IOException {
- try {
- lh.close();
- } catch (BKException e) {
- throw new IOException("Exception closing ledger", e);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted closing ledger", e);
- }
- }
-
- @Override
- public long getPosition() {
- return tracker.getPos();
- }
-
- @Override
- public long length() throws IOException {
- return lh.getLength();
- }
-
- @Override
- public String getName() {
- return String.format(
- "BookKeeperLedger[ledgerId=%d,firstTxId=%d,lastTxId=%d]", lh.getId(),
- firstTxId, lastTxId);
- }
-
- @Override
- public boolean isInProgress() {
- return inProgress;
- }
-
- /**
- * Skip forward to specified transaction id.
- * Currently we do this by just iterating forward.
- * If this proves to be too expensive, this can be reimplemented
- * with a binary search over bk entries
- */
- public void skipTo(long txId) throws IOException {
- long numToSkip = getFirstTxId() - txId;
-
- FSEditLogOp op = null;
- for (long i = 0; i < numToSkip; i++) {
- op = readOp();
- }
- if (op != null && op.getTransactionId() != txId-1) {
- throw new IOException("Corrupt stream, expected txid "
- + (txId-1) + ", got " + op.getTransactionId());
- }
- }
-
- @Override
- public String toString() {
- return ("BookKeeperEditLogInputStream {" + this.getName() + "}");
- }
-
- @Override
- public void setMaxOpSize(int maxOpSize) {
- reader.setMaxOpSize(maxOpSize);
- }
-
- @Override
- public boolean isLocalLog() {
- return false;
- }
-
- /**
- * Input stream implementation which can be used by
- * FSEditLogOp.Reader
- */
- private static class LedgerInputStream extends InputStream {
- private long readEntries;
- private InputStream entryStream = null;
- private final LedgerHandle lh;
- private final long maxEntry;
-
- /**
- * Construct ledger input stream
- * @param lh the ledger handle to read from
- * @param firstBookKeeperEntry ledger entry to start reading from
- */
- LedgerInputStream(LedgerHandle lh, long firstBookKeeperEntry)
- throws IOException {
- this.lh = lh;
- readEntries = firstBookKeeperEntry;
-
- maxEntry = lh.getLastAddConfirmed();
- }
-
- /**
- * Get input stream representing next entry in the
- * ledger.
- * @return input stream, or null if no more entries
- */
- private InputStream nextStream() throws IOException {
- try {
- if (readEntries > maxEntry) {
- return null;
- }
- Enumeration<LedgerEntry> entries
- = lh.readEntries(readEntries, readEntries);
- readEntries++;
- if (entries.hasMoreElements()) {
- LedgerEntry e = entries.nextElement();
- assert !entries.hasMoreElements();
- return e.getEntryInputStream();
- }
- } catch (BKException e) {
- throw new IOException("Error reading entries from bookkeeper", e);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted reading entries from bookkeeper", e);
- }
- return null;
- }
-
- @Override
- public int read() throws IOException {
- byte[] b = new byte[1];
- if (read(b, 0, 1) != 1) {
- return -1;
- } else {
- return b[0];
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- try {
- int read = 0;
- if (entryStream == null) {
- entryStream = nextStream();
- if (entryStream == null) {
- return read;
- }
- }
-
- while (read < len) {
- int thisread = entryStream.read(b, off+read, (len-read));
- if (thisread == -1) {
- entryStream = nextStream();
- if (entryStream == null) {
- return read;
- }
- } else {
- read += thisread;
- }
- }
- return read;
- } catch (IOException e) {
- throw e;
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
deleted file mode 100644
index 865806b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CountDownLatch;
-
-import java.util.Arrays;
-
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
-
-import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.io.DataOutputBuffer;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Output stream for BookKeeper Journal.
- * Multiple complete edit log entries are packed into a single bookkeeper
- * entry before sending it over the network. The fact that the edit log entries
- * are complete in the bookkeeper entries means that each bookkeeper log entry
- *can be read as a complete edit log. This is useful for recover, as we don't
- * need to read through the entire edit log segment to get the last written
- * entry.
- */
-class BookKeeperEditLogOutputStream
- extends EditLogOutputStream implements AddCallback {
- static final Log LOG = LogFactory.getLog(BookKeeperEditLogOutputStream.class);
-
- private final DataOutputBuffer bufCurrent;
- private final AtomicInteger outstandingRequests;
- private final int transmissionThreshold;
- private final LedgerHandle lh;
- private CountDownLatch syncLatch;
- private final AtomicInteger transmitResult
- = new AtomicInteger(BKException.Code.OK);
- private final Writer writer;
-
- /**
- * Construct an edit log output stream which writes to a ledger.
-
- */
- protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh)
- throws IOException {
- super();
-
- bufCurrent = new DataOutputBuffer();
- outstandingRequests = new AtomicInteger(0);
- syncLatch = null;
- this.lh = lh;
- this.writer = new Writer(bufCurrent);
- this.transmissionThreshold
- = conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
- BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE_DEFAULT);
- }
-
- @Override
- public void create(int layoutVersion) throws IOException {
- // noop
- }
-
- @Override
- public void close() throws IOException {
- setReadyToFlush();
- flushAndSync(true);
- try {
- lh.close();
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted waiting on close", ie);
- } catch (BKException bke) {
- throw new IOException("BookKeeper error during close", bke);
- }
- }
-
- @Override
- public void abort() throws IOException {
- try {
- lh.close();
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted waiting on close", ie);
- } catch (BKException bke) {
- throw new IOException("BookKeeper error during abort", bke);
- }
-
- }
-
- @Override
- public void writeRaw(final byte[] data, int off, int len) throws IOException {
- throw new IOException("Not supported for BK");
- }
-
- @Override
- public void write(FSEditLogOp op) throws IOException {
- writer.writeOp(op);
-
- if (bufCurrent.getLength() > transmissionThreshold) {
- transmit();
- }
- }
-
- @Override
- public void setReadyToFlush() throws IOException {
- transmit();
-
- synchronized (this) {
- syncLatch = new CountDownLatch(outstandingRequests.get());
- }
- }
-
- @Override
- public void flushAndSync(boolean durable) throws IOException {
- assert(syncLatch != null);
- try {
- syncLatch.await();
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted waiting on latch", ie);
- }
- if (transmitResult.get() != BKException.Code.OK) {
- throw new IOException("Failed to write to bookkeeper; Error is ("
- + transmitResult.get() + ") "
- + BKException.getMessage(transmitResult.get()));
- }
-
- syncLatch = null;
- // wait for whatever we wait on
- }
-
- /**
- * Transmit the current buffer to bookkeeper.
- * Synchronised at the FSEditLog level. #write() and #setReadyToFlush()
- * are never called at the same time.
- */
- private void transmit() throws IOException {
- if (!transmitResult.compareAndSet(BKException.Code.OK,
- BKException.Code.OK)) {
- throw new IOException("Trying to write to an errored stream;"
- + " Error code : (" + transmitResult.get()
- + ") " + BKException.getMessage(transmitResult.get()));
- }
- if (bufCurrent.getLength() > 0) {
- byte[] entry = Arrays.copyOf(bufCurrent.getData(),
- bufCurrent.getLength());
- lh.asyncAddEntry(entry, this, null);
- bufCurrent.reset();
- outstandingRequests.incrementAndGet();
- }
- }
-
- @Override
- public void addComplete(int rc, LedgerHandle handle,
- long entryId, Object ctx) {
- synchronized(this) {
- outstandingRequests.decrementAndGet();
- if (!transmitResult.compareAndSet(BKException.Code.OK, rc)) {
- LOG.warn("Tried to set transmit result to (" + rc + ") \""
- + BKException.getMessage(rc) + "\""
- + " but is already (" + transmitResult.get() + ") \""
- + BKException.getMessage(transmitResult.get()) + "\"");
- }
- CountDownLatch l = syncLatch;
- if (l != null) {
- l.countDown();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
deleted file mode 100644
index 8e4d032..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
+++ /dev/null
@@ -1,893 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.namenode.JournalManager;
-import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.util.ZkUtils;
-
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.ZKUtil;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.io.IOException;
-
-import java.net.URI;
-
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.VersionProto;
-import com.google.protobuf.TextFormat;
-import static com.google.common.base.Charsets.UTF_8;
-
-import org.apache.commons.io.Charsets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import com.google.common.annotations.VisibleForTesting;
-/**
- * BookKeeper Journal Manager
- *
- * To use, add the following to hdfs-site.xml.
- * <pre>
- * {@code
- * <property>
- * <name>dfs.namenode.edits.dir</name>
- * <value>bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal</value>
- * </property>
- *
- * <property>
- * <name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
- * <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
- * </property>
- * }
- * </pre>
- * The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode]
- * [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port
- * pairs. In the example above there are 3 servers, in the ensemble,
- * zk1, zk2 & zk3, each one listening on port 2181.
- *
- * [root znode] is the path of the zookeeper znode, under which the editlog
- * information will be stored.
- *
- * Other configuration options are:
- * <ul>
- * <li><b>dfs.namenode.bookkeeperjournal.output-buffer-size</b>
- * Number of bytes a bookkeeper journal stream will buffer before
- * forcing a flush. Default is 1024.</li>
- * <li><b>dfs.namenode.bookkeeperjournal.ensemble-size</b>
- * Number of bookkeeper servers in edit log ledger ensembles. This
- * is the number of bookkeeper servers which need to be available
- * for the ledger to be writable. Default is 3.</li>
- * <li><b>dfs.namenode.bookkeeperjournal.quorum-size</b>
- * Number of bookkeeper servers in the write quorum. This is the
- * number of bookkeeper servers which must have acknowledged the
- * write of an entry before it is considered written.
- * Default is 2.</li>
- * <li><b>dfs.namenode.bookkeeperjournal.digestPw</b>
- * Password to use when creating ledgers. </li>
- * <li><b>dfs.namenode.bookkeeperjournal.zk.session.timeout</b>
- * Session timeout for Zookeeper client from BookKeeper Journal Manager.
- * Hadoop recommends that, this value should be less than the ZKFC
- * session timeout value. Default value is 3000.</li>
- * </ul>
- */
-public class BookKeeperJournalManager implements JournalManager {
- static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class);
-
- public static final String BKJM_OUTPUT_BUFFER_SIZE
- = "dfs.namenode.bookkeeperjournal.output-buffer-size";
- public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
-
- public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE
- = "dfs.namenode.bookkeeperjournal.ensemble-size";
- public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
-
- public static final String BKJM_BOOKKEEPER_QUORUM_SIZE
- = "dfs.namenode.bookkeeperjournal.quorum-size";
- public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2;
-
- public static final String BKJM_BOOKKEEPER_DIGEST_PW
- = "dfs.namenode.bookkeeperjournal.digestPw";
- public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
-
- private static final int BKJM_LAYOUT_VERSION = -1;
-
- public static final String BKJM_ZK_SESSION_TIMEOUT
- = "dfs.namenode.bookkeeperjournal.zk.session.timeout";
- public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000;
-
- private static final String BKJM_EDIT_INPROGRESS = "inprogress_";
-
- public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH
- = "dfs.namenode.bookkeeperjournal.zk.availablebookies";
-
- public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT
- = "/ledgers/available";
-
- public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS
- = "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs";
- public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT
- = 2000;
-
- public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC
- = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
- public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
-
- public static final String BKJM_BOOKKEEPER_ACK_QUORUM_SIZE
- = "dfs.namenode.bookkeeperjournal.ack.quorum-size";
-
- public static final String BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC
- = "dfs.namenode.bookkeeperjournal.addEntryTimeoutSec";
- public static final int BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT = 5;
-
- private ZooKeeper zkc;
- private final Configuration conf;
- private final BookKeeper bkc;
- private final CurrentInprogress ci;
- private final String basePath;
- private final String ledgerPath;
- private final String versionPath;
- private final MaxTxId maxTxId;
- private final int ensembleSize;
- private final int quorumSize;
- private final int ackQuorumSize;
- private final int addEntryTimeout;
- private final String digestpw;
- private final int speculativeReadTimeout;
- private final int readEntryTimeout;
- private final CountDownLatch zkConnectLatch;
- private final NamespaceInfo nsInfo;
- private boolean initialized = false;
- private LedgerHandle currentLedger = null;
-
- /**
- * Construct a Bookkeeper journal manager.
- */
- public BookKeeperJournalManager(Configuration conf, URI uri,
- NamespaceInfo nsInfo) throws IOException {
- this.conf = conf;
- this.nsInfo = nsInfo;
-
- String zkConnect = uri.getAuthority().replace(";", ",");
- basePath = uri.getPath();
- ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
- BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
- quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
- BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
- ackQuorumSize = conf.getInt(BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize);
- addEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
- BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT);
- speculativeReadTimeout = conf.getInt(
- BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
- BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
- readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC,
- BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT);
-
- ledgerPath = basePath + "/ledgers";
- String maxTxIdPath = basePath + "/maxtxid";
- String currentInprogressNodePath = basePath + "/CurrentInprogress";
- versionPath = basePath + "/version";
- digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
- BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
-
- try {
- zkConnectLatch = new CountDownLatch(1);
- int bkjmZKSessionTimeout = conf.getInt(BKJM_ZK_SESSION_TIMEOUT,
- BKJM_ZK_SESSION_TIMEOUT_DEFAULT);
- zkc = new ZooKeeper(zkConnect, bkjmZKSessionTimeout,
- new ZkConnectionWatcher());
- // Configured zk session timeout + some extra grace period (here
- // BKJM_ZK_SESSION_TIMEOUT_DEFAULT used as grace period)
- int zkConnectionLatchTimeout = bkjmZKSessionTimeout
- + BKJM_ZK_SESSION_TIMEOUT_DEFAULT;
- if (!zkConnectLatch
- .await(zkConnectionLatchTimeout, TimeUnit.MILLISECONDS)) {
- throw new IOException("Error connecting to zookeeper");
- }
-
- prepareBookKeeperEnv();
- ClientConfiguration clientConf = new ClientConfiguration();
- clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
- clientConf.setReadEntryTimeout(readEntryTimeout);
- clientConf.setAddEntryTimeout(addEntryTimeout);
- bkc = new BookKeeper(clientConf, zkc);
- } catch (KeeperException e) {
- throw new IOException("Error initializing zk", e);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while initializing bk journal manager",
- ie);
- }
-
- ci = new CurrentInprogress(zkc, currentInprogressNodePath);
- maxTxId = new MaxTxId(zkc, maxTxIdPath);
- }
-
- /**
- * Pre-creating bookkeeper metadata path in zookeeper.
- */
- private void prepareBookKeeperEnv() throws IOException {
- // create bookie available path in zookeeper if it doesn't exists
- final String zkAvailablePath = conf.get(BKJM_ZK_LEDGERS_AVAILABLE_PATH,
- BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT);
- final CountDownLatch zkPathLatch = new CountDownLatch(1);
-
- final AtomicBoolean success = new AtomicBoolean(false);
- StringCallback callback = new StringCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- if (KeeperException.Code.OK.intValue() == rc
- || KeeperException.Code.NODEEXISTS.intValue() == rc) {
- LOG.info("Successfully created bookie available path : "
- + zkAvailablePath);
- success.set(true);
- } else {
- KeeperException.Code code = KeeperException.Code.get(rc);
- LOG.error("Error : "
- + KeeperException.create(code, path).getMessage()
- + ", failed to create bookie available path : "
- + zkAvailablePath);
- }
- zkPathLatch.countDown();
- }
- };
- ZkUtils.asyncCreateFullPathOptimistic(zkc, zkAvailablePath, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null);
-
- try {
- if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)
- || !success.get()) {
- throw new IOException("Couldn't create bookie available path :"
- + zkAvailablePath + ", timed out " + zkc.getSessionTimeout()
- + " millis");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(
- "Interrupted when creating the bookie available path : "
- + zkAvailablePath, e);
- }
- }
-
- @Override
- public void format(NamespaceInfo ns) throws IOException {
- try {
- // delete old info
- Stat baseStat = null;
- Stat ledgerStat = null;
- if ((baseStat = zkc.exists(basePath, false)) != null) {
- if ((ledgerStat = zkc.exists(ledgerPath, false)) != null) {
- for (EditLogLedgerMetadata l : getLedgerList(true)) {
- try {
- bkc.deleteLedger(l.getLedgerId());
- } catch (BKException.BKNoSuchLedgerExistsException bke) {
- LOG.warn("Ledger " + l.getLedgerId() + " does not exist;"
- + " Cannot delete.");
- }
- }
- }
- ZKUtil.deleteRecursive(zkc, basePath);
- }
-
- // should be clean now.
- zkc.create(basePath, new byte[] {'0'},
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
- VersionProto.Builder builder = VersionProto.newBuilder();
- builder.setNamespaceInfo(PBHelper.convert(ns))
- .setLayoutVersion(BKJM_LAYOUT_VERSION);
-
- byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
- zkc.create(versionPath, data,
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
- zkc.create(ledgerPath, new byte[] {'0'},
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException ke) {
- LOG.error("Error accessing zookeeper to format", ke);
- throw new IOException("Error accessing zookeeper to format", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted during format", ie);
- } catch (BKException bke) {
- throw new IOException("Error cleaning up ledgers during format", bke);
- }
- }
-
- @Override
- public boolean hasSomeData() throws IOException {
- try {
- return zkc.exists(basePath, false) != null;
- } catch (KeeperException ke) {
- throw new IOException("Couldn't contact zookeeper", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while checking for data", ie);
- }
- }
-
- synchronized private void checkEnv() throws IOException {
- if (!initialized) {
- try {
- Stat versionStat = zkc.exists(versionPath, false);
- if (versionStat == null) {
- throw new IOException("Environment not initialized. "
- +"Have you forgotten to format?");
- }
- byte[] d = zkc.getData(versionPath, false, versionStat);
-
- VersionProto.Builder builder = VersionProto.newBuilder();
- TextFormat.merge(new String(d, UTF_8), builder);
- if (!builder.isInitialized()) {
- throw new IOException("Invalid/Incomplete data in znode");
- }
- VersionProto vp = builder.build();
-
- // There's only one version at the moment
- assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION;
-
- NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo());
-
- if (nsInfo.getNamespaceID() != readns.getNamespaceID() ||
- !nsInfo.clusterID.equals(readns.getClusterID()) ||
- !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
- String err = String.format("Environment mismatch. Running process %s"
- +", stored in ZK %s", nsInfo, readns);
- LOG.error(err);
- throw new IOException(err);
- }
-
- ci.init();
- initialized = true;
- } catch (KeeperException ke) {
- throw new IOException("Cannot access ZooKeeper", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while checking environment", ie);
- }
- }
- }
-
- /**
- * Start a new log segment in a BookKeeper ledger.
- * First ensure that we have the write lock for this journal.
- * Then create a ledger and stream based on that ledger.
- * The ledger id is written to the inprogress znode, so that in the
- * case of a crash, a recovery process can find the ledger we were writing
- * to when we crashed.
- * @param txId First transaction id to be written to the stream
- */
- @Override
- public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
- throws IOException {
- checkEnv();
-
- if (txId <= maxTxId.get()) {
- throw new IOException("We've already seen " + txId
- + ". A new stream cannot be created with it");
- }
-
- try {
- String existingInprogressNode = ci.read();
- if (null != existingInprogressNode
- && zkc.exists(existingInprogressNode, false) != null) {
- throw new IOException("Inprogress node already exists");
- }
- if (currentLedger != null) {
- // bookkeeper errored on last stream, clean up ledger
- currentLedger.close();
- }
- currentLedger = bkc.createLedger(ensembleSize, quorumSize, ackQuorumSize,
- BookKeeper.DigestType.MAC,
- digestpw.getBytes(Charsets.UTF_8));
- } catch (BKException bke) {
- throw new IOException("Error creating ledger", bke);
- } catch (KeeperException ke) {
- throw new IOException("Error in zookeeper while creating ledger", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted creating ledger", ie);
- }
-
- try {
- String znodePath = inprogressZNode(txId);
- EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
- layoutVersion, currentLedger.getId(), txId);
- /* Write the ledger metadata out to the inprogress ledger znode
- * This can fail if for some reason our write lock has
- * expired (@see WriteLock) and another process has managed to
- * create the inprogress znode.
- * In this case, throw an exception. We don't want to continue
- * as this would lead to a split brain situation.
- */
- l.write(zkc, znodePath);
-
- maxTxId.store(txId);
- ci.update(znodePath);
- return new BookKeeperEditLogOutputStream(conf, currentLedger);
- } catch (KeeperException ke) {
- cleanupLedger(currentLedger);
- throw new IOException("Error storing ledger metadata", ke);
- }
- }
-
- private void cleanupLedger(LedgerHandle lh) {
- try {
- long id = currentLedger.getId();
- currentLedger.close();
- bkc.deleteLedger(id);
- } catch (BKException bke) {
- //log & ignore, an IOException will be thrown soon
- LOG.error("Error closing ledger", bke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted while closing ledger", ie);
- }
- }
-
-
-
- /**
- * Finalize a log segment. If the journal manager is currently
- * writing to a ledger, ensure that this is the ledger of the log segment
- * being finalized.
- *
- * Otherwise this is the recovery case. In the recovery case, ensure that
- * the firstTxId of the ledger matches firstTxId for the segment we are
- * trying to finalize.
- */
- @Override
- public void finalizeLogSegment(long firstTxId, long lastTxId)
- throws IOException {
- checkEnv();
-
- String inprogressPath = inprogressZNode(firstTxId);
- try {
- Stat inprogressStat = zkc.exists(inprogressPath, false);
- if (inprogressStat == null) {
- throw new IOException("Inprogress znode " + inprogressPath
- + " doesn't exist");
- }
-
- EditLogLedgerMetadata l
- = EditLogLedgerMetadata.read(zkc, inprogressPath);
-
- if (currentLedger != null) { // normal, non-recovery case
- if (l.getLedgerId() == currentLedger.getId()) {
- try {
- currentLedger.close();
- } catch (BKException bke) {
- LOG.error("Error closing current ledger", bke);
- }
- currentLedger = null;
- } else {
- throw new IOException(
- "Active ledger has different ID to inprogress. "
- + l.getLedgerId() + " found, "
- + currentLedger.getId() + " expected");
- }
- }
-
- if (l.getFirstTxId() != firstTxId) {
- throw new IOException("Transaction id not as expected, "
- + l.getFirstTxId() + " found, " + firstTxId + " expected");
- }
-
- l.finalizeLedger(lastTxId);
- String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId);
- try {
- l.write(zkc, finalisedPath);
- } catch (KeeperException.NodeExistsException nee) {
- if (!l.verify(zkc, finalisedPath)) {
- throw new IOException("Node " + finalisedPath + " already exists"
- + " but data doesn't match");
- }
- }
- maxTxId.store(lastTxId);
- zkc.delete(inprogressPath, inprogressStat.getVersion());
- String inprogressPathFromCI = ci.read();
- if (inprogressPath.equals(inprogressPathFromCI)) {
- ci.clear();
- }
- } catch (KeeperException e) {
- throw new IOException("Error finalising ledger", e);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Error finalising ledger", ie);
- }
- }
-
- public void selectInputStreams(
- Collection<EditLogInputStream> streams,
- long fromTxnId, boolean inProgressOk) throws IOException {
- selectInputStreams(streams, fromTxnId, inProgressOk, false);
- }
-
- @Override
- public void selectInputStreams(Collection<EditLogInputStream> streams,
- long fromTxId, boolean inProgressOk, boolean onlyDurableTxns)
- throws IOException {
- List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
- inProgressOk);
- try {
- BookKeeperEditLogInputStream elis = null;
- for (EditLogLedgerMetadata l : currentLedgerList) {
- long lastTxId = l.getLastTxId();
- if (l.isInProgress()) {
- lastTxId = recoverLastTxId(l, false);
- }
- // Check once again, required in case of InProgress and is case of any
- // gap.
- if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
- LedgerHandle h;
- if (l.isInProgress()) { // we don't want to fence the current journal
- h = bkc.openLedgerNoRecovery(l.getLedgerId(),
- BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8));
- } else {
- h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
- digestpw.getBytes(Charsets.UTF_8));
- }
- elis = new BookKeeperEditLogInputStream(h, l);
- elis.skipTo(fromTxId);
- } else {
- // If mismatches then there might be some gap, so we should not check
- // further.
- return;
- }
- streams.add(elis);
- if (elis.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
- return;
- }
- fromTxId = elis.getLastTxId() + 1;
- }
- } catch (BKException e) {
- throw new IOException("Could not open ledger for " + fromTxId, e);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted opening ledger for " + fromTxId, ie);
- }
- }
-
- long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
- throws IOException {
- long count = 0;
- long expectedStart = 0;
- for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
- long lastTxId = l.getLastTxId();
- if (l.isInProgress()) {
- lastTxId = recoverLastTxId(l, false);
- if (lastTxId == HdfsServerConstants.INVALID_TXID) {
- break;
- }
- }
-
- assert lastTxId >= l.getFirstTxId();
-
- if (lastTxId < fromTxId) {
- continue;
- } else if (l.getFirstTxId() <= fromTxId && lastTxId >= fromTxId) {
- // we can start in the middle of a segment
- count = (lastTxId - l.getFirstTxId()) + 1;
- expectedStart = lastTxId + 1;
- } else {
- if (expectedStart != l.getFirstTxId()) {
- if (count == 0) {
- throw new CorruptionException("StartTxId " + l.getFirstTxId()
- + " is not as expected " + expectedStart
- + ". Gap in transaction log?");
- } else {
- break;
- }
- }
- count += (lastTxId - l.getFirstTxId()) + 1;
- expectedStart = lastTxId + 1;
- }
- }
- return count;
- }
-
- @Override
- public void recoverUnfinalizedSegments() throws IOException {
- checkEnv();
-
- synchronized (this) {
- try {
- List<String> children = zkc.getChildren(ledgerPath, false);
- for (String child : children) {
- if (!child.startsWith(BKJM_EDIT_INPROGRESS)) {
- continue;
- }
- String znode = ledgerPath + "/" + child;
- EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, znode);
- try {
- long endTxId = recoverLastTxId(l, true);
- if (endTxId == HdfsServerConstants.INVALID_TXID) {
- LOG.error("Unrecoverable corruption has occurred in segment "
- + l.toString() + " at path " + znode
- + ". Unable to continue recovery.");
- throw new IOException("Unrecoverable corruption,"
- + " please check logs.");
- }
- finalizeLogSegment(l.getFirstTxId(), endTxId);
- } catch (SegmentEmptyException see) {
- LOG.warn("Inprogress znode " + child
- + " refers to a ledger which is empty. This occurs when the NN"
- + " crashes after opening a segment, but before writing the"
- + " OP_START_LOG_SEGMENT op. It is safe to delete."
- + " MetaData [" + l.toString() + "]");
-
- // If the max seen transaction is the same as what would
- // have been the first transaction of the failed ledger,
- // decrement it, as that transaction never happened and as
- // such, is _not_ the last seen
- if (maxTxId.get() == l.getFirstTxId()) {
- maxTxId.reset(maxTxId.get() - 1);
- }
-
- zkc.delete(znode, -1);
- }
- }
- } catch (KeeperException.NoNodeException nne) {
- // nothing to recover, ignore
- } catch (KeeperException ke) {
- throw new IOException("Couldn't get list of inprogress segments", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted getting list of inprogress segments",
- ie);
- }
- }
- }
-
- @Override
- public void purgeLogsOlderThan(long minTxIdToKeep)
- throws IOException {
- checkEnv();
-
- for (EditLogLedgerMetadata l : getLedgerList(false)) {
- if (l.getLastTxId() < minTxIdToKeep) {
- try {
- Stat stat = zkc.exists(l.getZkPath(), false);
- zkc.delete(l.getZkPath(), stat.getVersion());
- bkc.deleteLedger(l.getLedgerId());
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.error("Interrupted while purging " + l, ie);
- } catch (BKException bke) {
- LOG.error("Couldn't delete ledger from bookkeeper", bke);
- } catch (KeeperException ke) {
- LOG.error("Error deleting ledger entry in zookeeper", ke);
- }
- }
- }
- }
-
- @Override
- public void doPreUpgrade() throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void doUpgrade(Storage storage) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getJournalCTime() throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void doFinalize() throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
- int targetLayoutVersion) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void doRollback() throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void discardSegments(long startTxId) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws IOException {
- try {
- bkc.close();
- zkc.close();
- } catch (BKException bke) {
- throw new IOException("Couldn't close bookkeeper client", bke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while closing journal manager", ie);
- }
- }
-
- /**
- * Set the amount of memory that this stream should use to buffer edits.
- * Setting this will only affect future output stream. Streams
- * which have currently be created won't be affected.
- */
- @Override
- public void setOutputBufferCapacity(int size) {
- conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size);
- }
-
- /**
- * Find the id of the last edit log transaction writen to a edit log
- * ledger.
- */
- private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
- throws IOException, SegmentEmptyException {
- LedgerHandle lh = null;
- try {
- if (fence) {
- lh = bkc.openLedger(l.getLedgerId(),
- BookKeeper.DigestType.MAC,
- digestpw.getBytes(Charsets.UTF_8));
- } else {
- lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
- BookKeeper.DigestType.MAC,
- digestpw.getBytes(Charsets.UTF_8));
- }
- } catch (BKException bke) {
- throw new IOException("Exception opening ledger for " + l, bke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted opening ledger for " + l, ie);
- }
-
- BookKeeperEditLogInputStream in = null;
-
- try {
- long lastAddConfirmed = lh.getLastAddConfirmed();
- if (lastAddConfirmed == -1) {
- throw new SegmentEmptyException();
- }
-
- in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
-
- long endTxId = HdfsServerConstants.INVALID_TXID;
- FSEditLogOp op = in.readOp();
- while (op != null) {
- if (endTxId == HdfsServerConstants.INVALID_TXID
- || op.getTransactionId() == endTxId+1) {
- endTxId = op.getTransactionId();
- }
- op = in.readOp();
- }
- return endTxId;
- } finally {
- if (in != null) {
- in.close();
- }
- }
- }
-
- /**
- * Get a list of all segments in the journal.
- */
- List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
- throws IOException {
- return getLedgerList(-1, inProgressOk);
- }
-
- private List<EditLogLedgerMetadata> getLedgerList(long fromTxId,
- boolean inProgressOk) throws IOException {
- List<EditLogLedgerMetadata> ledgers
- = new ArrayList<EditLogLedgerMetadata>();
- try {
- List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
- for (String ledgerName : ledgerNames) {
- if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) {
- continue;
- }
- String legderMetadataPath = ledgerPath + "/" + ledgerName;
- try {
- EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
- .read(zkc, legderMetadataPath);
- if (editLogLedgerMetadata.getLastTxId() != HdfsServerConstants.INVALID_TXID
- && editLogLedgerMetadata.getLastTxId() < fromTxId) {
- // exclude already read closed edits, but include inprogress edits
- // as this will be handled in caller
- continue;
- }
- ledgers.add(editLogLedgerMetadata);
- } catch (KeeperException.NoNodeException e) {
- LOG.warn("ZNode: " + legderMetadataPath
- + " might have finalized and deleted."
- + " So ignoring NoNodeException.");
- }
- }
- } catch (KeeperException e) {
- throw new IOException("Exception reading ledger list from zk", e);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted getting list of ledgers from zk", ie);
- }
-
- Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
- return ledgers;
- }
-
- /**
- * Get the znode path for a finalize ledger
- */
- String finalizedLedgerZNode(long startTxId, long endTxId) {
- return String.format("%s/edits_%018d_%018d",
- ledgerPath, startTxId, endTxId);
- }
-
- /**
- * Get the znode path for the inprogressZNode
- */
- String inprogressZNode(long startTxid) {
- return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16);
- }
-
- @VisibleForTesting
- void setZooKeeper(ZooKeeper zk) {
- this.zkc = zk;
- }
-
- /**
- * Simple watcher to notify when zookeeper has connected
- */
- private class ZkConnectionWatcher implements Watcher {
- public void process(WatchedEvent event) {
- if (Event.KeeperState.SyncConnected.equals(event.getState())) {
- zkConnectLatch.countDown();
- }
- }
- }
-
- private static class SegmentEmptyException extends IOException {
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
deleted file mode 100644
index 32d65cb..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.IOException;
-import java.net.InetAddress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.CurrentInprogressProto;
-import com.google.protobuf.TextFormat;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Distributed write permission lock, using ZooKeeper. Read the version number
- * and return the current inprogress node path available in CurrentInprogress
- * path. If it exist, caller can treat that some other client already operating
- * on it. Then caller can take action. If there is no inprogress node exist,
- * then caller can treat that there is no client operating on it. Later same
- * caller should update the his newly created inprogress node path. At this
- * point, if some other activities done on this node, version number might
- * change, so update will fail. So, this read, update api will ensure that there
- * is only node can continue further after checking with CurrentInprogress.
- */
-
-class CurrentInprogress {
- static final Log LOG = LogFactory.getLog(CurrentInprogress.class);
-
- private final ZooKeeper zkc;
- private final String currentInprogressNode;
- private volatile int versionNumberForPermission = -1;
- private final String hostName = InetAddress.getLocalHost().toString();
-
- CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
- this.currentInprogressNode = lockpath;
- this.zkc = zkc;
- }
-
- void init() throws IOException {
- try {
- Stat isCurrentInprogressNodeExists = zkc.exists(currentInprogressNode,
- false);
- if (isCurrentInprogressNodeExists == null) {
- try {
- zkc.create(currentInprogressNode, null, Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } catch (NodeExistsException e) {
- // Node might created by other process at the same time. Ignore it.
- if (LOG.isDebugEnabled()) {
- LOG.debug(currentInprogressNode + " already created by other process.",
- e);
- }
- }
- }
- } catch (KeeperException e) {
- throw new IOException("Exception accessing Zookeeper", e);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted accessing Zookeeper", ie);
- }
- }
-
- /**
- * Update the path with prepending version number and hostname
- *
- * @param path
- * - to be updated in zookeeper
- * @throws IOException
- */
- void update(String path) throws IOException {
- CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder();
- builder.setPath(path).setHostname(hostName);
-
- String content = TextFormat.printToString(builder.build());
-
- try {
- zkc.setData(this.currentInprogressNode, content.getBytes(UTF_8),
- this.versionNumberForPermission);
- } catch (KeeperException e) {
- throw new IOException("Exception when setting the data "
- + "[" + content + "] to CurrentInprogress. ", e);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while setting the data "
- + "[" + content + "] to CurrentInprogress", e);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Updated data[" + content + "] to CurrentInprogress");
- }
- }
-
- /**
- * Read the CurrentInprogress node data from Zookeeper and also get the znode
- * version number. Return the 3rd field from the data. i.e saved path with
- * #update api
- *
- * @return available inprogress node path. returns null if not available.
- * @throws IOException
- */
- String read() throws IOException {
- Stat stat = new Stat();
- byte[] data = null;
- try {
- data = zkc.getData(this.currentInprogressNode, false, stat);
- } catch (KeeperException e) {
- throw new IOException("Exception while reading the data from "
- + currentInprogressNode, e);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while reading data from "
- + currentInprogressNode, e);
- }
- this.versionNumberForPermission = stat.getVersion();
- if (data != null) {
- CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder();
- TextFormat.merge(new String(data, UTF_8), builder);
- if (!builder.isInitialized()) {
- throw new IOException("Invalid/Incomplete data in znode");
- }
- return builder.build().getPath();
- } else {
- LOG.debug("No data available in CurrentInprogress");
- }
- return null;
- }
-
- /** Clear the CurrentInprogress node data */
- void clear() throws IOException {
- try {
- zkc.setData(this.currentInprogressNode, null, versionNumberForPermission);
- } catch (KeeperException e) {
- throw new IOException(
- "Exception when setting the data to CurrentInprogress node", e);
- } catch (InterruptedException e) {
- throw new IOException(
- "Interrupted when setting the data to CurrentInprogress node", e);
- }
- LOG.debug("Cleared the data from CurrentInprogress");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
deleted file mode 100644
index 2d1f8b9..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.IOException;
-import java.util.Comparator;
-
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.KeeperException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.EditLogLedgerProto;
-import com.google.protobuf.TextFormat;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Utility class for storing the metadata associated
- * with a single edit log segment, stored in a single ledger
- */
-public class EditLogLedgerMetadata {
- static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class);
-
- private String zkPath;
- private final int dataLayoutVersion;
- private final long ledgerId;
- private final long firstTxId;
- private long lastTxId;
- private boolean inprogress;
-
- public static final Comparator COMPARATOR
- = new Comparator<EditLogLedgerMetadata>() {
- public int compare(EditLogLedgerMetadata o1,
- EditLogLedgerMetadata o2) {
- if (o1.firstTxId < o2.firstTxId) {
- return -1;
- } else if (o1.firstTxId == o2.firstTxId) {
- return 0;
- } else {
- return 1;
- }
- }
- };
-
- EditLogLedgerMetadata(String zkPath, int dataLayoutVersion,
- long ledgerId, long firstTxId) {
- this.zkPath = zkPath;
- this.dataLayoutVersion = dataLayoutVersion;
- this.ledgerId = ledgerId;
- this.firstTxId = firstTxId;
- this.lastTxId = HdfsServerConstants.INVALID_TXID;
- this.inprogress = true;
- }
-
- EditLogLedgerMetadata(String zkPath, int dataLayoutVersion,
- long ledgerId, long firstTxId,
- long lastTxId) {
- this.zkPath = zkPath;
- this.dataLayoutVersion = dataLayoutVersion;
- this.ledgerId = ledgerId;
- this.firstTxId = firstTxId;
- this.lastTxId = lastTxId;
- this.inprogress = false;
- }
-
- String getZkPath() {
- return zkPath;
- }
-
- long getFirstTxId() {
- return firstTxId;
- }
-
- long getLastTxId() {
- return lastTxId;
- }
-
- long getLedgerId() {
- return ledgerId;
- }
-
- boolean isInProgress() {
- return this.inprogress;
- }
-
- int getDataLayoutVersion() {
- return this.dataLayoutVersion;
- }
-
- void finalizeLedger(long newLastTxId) {
- assert this.lastTxId == HdfsServerConstants.INVALID_TXID;
- this.lastTxId = newLastTxId;
- this.inprogress = false;
- }
-
- static EditLogLedgerMetadata read(ZooKeeper zkc, String path)
- throws IOException, KeeperException.NoNodeException {
- try {
- byte[] data = zkc.getData(path, false, null);
-
- EditLogLedgerProto.Builder builder = EditLogLedgerProto.newBuilder();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reading " + path + " data: " + new String(data, UTF_8));
- }
- TextFormat.merge(new String(data, UTF_8), builder);
- if (!builder.isInitialized()) {
- throw new IOException("Invalid/Incomplete data in znode");
- }
- EditLogLedgerProto ledger = builder.build();
-
- int dataLayoutVersion = ledger.getDataLayoutVersion();
- long ledgerId = ledger.getLedgerId();
- long firstTxId = ledger.getFirstTxId();
- if (ledger.hasLastTxId()) {
- long lastTxId = ledger.getLastTxId();
- return new EditLogLedgerMetadata(path, dataLayoutVersion,
- ledgerId, firstTxId, lastTxId);
- } else {
- return new EditLogLedgerMetadata(path, dataLayoutVersion,
- ledgerId, firstTxId);
- }
- } catch(KeeperException.NoNodeException nne) {
- throw nne;
- } catch(KeeperException ke) {
- throw new IOException("Error reading from zookeeper", ke);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted reading from zookeeper", ie);
- }
- }
-
- void write(ZooKeeper zkc, String path)
- throws IOException, KeeperException.NodeExistsException {
- this.zkPath = path;
-
- EditLogLedgerProto.Builder builder = EditLogLedgerProto.newBuilder();
- builder.setDataLayoutVersion(dataLayoutVersion)
- .setLedgerId(ledgerId).setFirstTxId(firstTxId);
-
- if (!inprogress) {
- builder.setLastTxId(lastTxId);
- }
- try {
- zkc.create(path, TextFormat.printToString(builder.build()).getBytes(UTF_8),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException nee) {
- throw nee;
- } catch (KeeperException e) {
- throw new IOException("Error creating ledger znode", e);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted creating ledger znode", ie);
- }
- }
-
- boolean verify(ZooKeeper zkc, String path) {
- try {
- EditLogLedgerMetadata other = read(zkc, path);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Verifying " + this.toString()
- + " against " + other);
- }
- return other.equals(this);
- } catch (KeeperException e) {
- LOG.error("Couldn't verify data in " + path, e);
- return false;
- } catch (IOException ie) {
- LOG.error("Couldn't verify data in " + path, ie);
- return false;
- }
- }
-
- public boolean equals(Object o) {
- if (!(o instanceof EditLogLedgerMetadata)) {
- return false;
- }
- EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o;
- return ledgerId == ol.ledgerId
- && dataLayoutVersion == ol.dataLayoutVersion
- && firstTxId == ol.firstTxId
- && lastTxId == ol.lastTxId;
- }
-
- public int hashCode() {
- int hash = 1;
- hash = hash * 31 + (int) ledgerId;
- hash = hash * 31 + (int) firstTxId;
- hash = hash * 31 + (int) lastTxId;
- hash = hash * 31 + dataLayoutVersion;
- return hash;
- }
-
- public String toString() {
- return "[LedgerId:"+ledgerId +
- ", firstTxId:" + firstTxId +
- ", lastTxId:" + lastTxId +
- ", dataLayoutVersion:" + dataLayoutVersion + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
deleted file mode 100644
index 5a2eefa..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.MaxTxIdProto;
-import com.google.protobuf.TextFormat;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Utility class for storing and reading
- * the max seen txid in zookeeper
- */
-class MaxTxId {
- static final Log LOG = LogFactory.getLog(MaxTxId.class);
-
- private final ZooKeeper zkc;
- private final String path;
-
- private Stat currentStat;
-
- MaxTxId(ZooKeeper zkc, String path) {
- this.zkc = zkc;
- this.path = path;
- }
-
- synchronized void store(long maxTxId) throws IOException {
- long currentMax = get();
- if (currentMax < maxTxId) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Setting maxTxId to " + maxTxId);
- }
- reset(maxTxId);
- }
- }
-
- synchronized void reset(long maxTxId) throws IOException {
- try {
- MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder().setTxId(maxTxId);
-
- byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
- if (currentStat != null) {
- currentStat = zkc.setData(path, data, currentStat
- .getVersion());
- } else {
- zkc.create(path, data, Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
- } catch (KeeperException e) {
- throw new IOException("Error writing max tx id", e);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while writing max tx id", e);
- }
- }
-
- synchronized long get() throws IOException {
- try {
- currentStat = zkc.exists(path, false);
- if (currentStat == null) {
- return 0;
- } else {
-
- byte[] bytes = zkc.getData(path, false, currentStat);
-
- MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder();
- TextFormat.merge(new String(bytes, UTF_8), builder);
- if (!builder.isInitialized()) {
- throw new IOException("Invalid/Incomplete data in znode");
- }
-
- return builder.build().getTxId();
- }
- } catch (KeeperException e) {
- throw new IOException("Error reading the max tx id from zk", e);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted while reading thr max tx id", ie);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto
deleted file mode 100644
index 15fa479..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// This file contains protocol buffers that are used by bkjournal
-// mostly for storing data in zookeeper
-
-option java_package = "org.apache.hadoop.contrib.bkjournal";
-option java_outer_classname = "BKJournalProtos";
-option java_generate_equals_and_hash = true;
-package hadoop.hdfs;
-
-import "hdfs.proto";
-import "HdfsServer.proto";
-
-message VersionProto {
- required int32 layoutVersion = 1;
- optional NamespaceInfoProto namespaceInfo = 2;
-}
-
-message EditLogLedgerProto {
- required int32 dataLayoutVersion = 1;
- required int64 ledgerId = 2;
- required int64 firstTxId = 3;
- optional int64 lastTxId = 4;
-}
-
-message MaxTxIdProto {
- required int64 txId = 1;
-}
-
-message CurrentInprogressProto {
- required string path = 1;
- optional string hostname = 2;
-}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org