You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/12 14:09:17 UTC
[11/22] ignite git commit: IGNITE-2195: Implemented Hadoop FileSystem
factory capable of working with kerberized file systems. This closes #464.
IGNITE-2195: Implemented Hadoop FileSystem factory capable of working with kerberized file systems. This closes #464.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/008c8cd3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/008c8cd3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/008c8cd3
Branch: refs/heads/ignite-2407
Commit: 008c8cd3f33b9c2cace43a9d1f2b4e4542fb58fe
Parents: fa3706f
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Feb 10 15:45:58 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 10 15:45:58 2016 +0300
----------------------------------------------------------------------
.../hadoop/fs/BasicHadoopFileSystemFactory.java | 22 +-
.../fs/CachingHadoopFileSystemFactory.java | 2 +-
.../fs/KerberosHadoopFileSystemFactory.java | 214 +++++++++++++++++++
...KerberosHadoopFileSystemFactorySelfTest.java | 104 +++++++++
.../testsuites/IgniteHadoopTestSuite.java | 3 +
5 files changed, 339 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
index c791e9a..01fe6c9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
@@ -66,7 +66,7 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
/** {@inheritDoc} */
@Override public FileSystem get(String usrName) throws IOException {
- return create0(IgfsUtils.fixUserName(usrName));
+ return get0(IgfsUtils.fixUserName(usrName));
}
/**
@@ -76,7 +76,7 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
* @return File system.
* @throws IOException If failed.
*/
- protected FileSystem create0(String usrName) throws IOException {
+ protected FileSystem get0(String usrName) throws IOException {
assert cfg != null;
try {
@@ -87,12 +87,12 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
ClassLoader clsLdr = getClass().getClassLoader();
if (ctxClsLdr == clsLdr)
- return FileSystem.get(fullUri, cfg, usrName);
+ return create(usrName);
else {
Thread.currentThread().setContextClassLoader(clsLdr);
try {
- return FileSystem.get(fullUri, cfg, usrName);
+ return create(usrName);
}
finally {
Thread.currentThread().setContextClassLoader(ctxClsLdr);
@@ -107,6 +107,18 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
}
/**
+ * Internal file system creation routine, invoked in correct class loader context.
+ *
+ * @param usrName User name.
+ * @return File system.
+ * @throws IOException If failed.
+ * @throws InterruptedException if the current thread is interrupted.
+ */
+ protected FileSystem create(String usrName) throws IOException, InterruptedException {
+ return FileSystem.get(fullUri, cfg, usrName);
+ }
+
+ /**
* Gets file system URI.
* <p>
* This URI will be used as a first argument when calling {@link FileSystem#get(URI, Configuration, String)}.
@@ -152,7 +164,7 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
*
* @param cfgPaths Paths to file system configuration files.
*/
- public void setConfigPaths(String... cfgPaths) {
+ public void setConfigPaths(@Nullable String... cfgPaths) {
this.cfgPaths = cfgPaths;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
index 91f7777..e1b30c4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
@@ -47,7 +47,7 @@ public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory
private final transient HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
@Override public FileSystem createValue(String key) throws IOException {
- return create0(key);
+ return get0(key);
}
}
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
new file mode 100644
index 0000000..fc768d6
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Secure Hadoop file system factory that can work with underlying file system protected with Kerberos.
+ * It uses "impersonation" mechanism, to be able to work on behalf of arbitrary client user.
+ * Please see https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html for details.
+ * The principal and the key tab name to be used for Kerberos authentication are set explicitly
+ * in the factory configuration.
+ *
+ * <p>This factory does not cache any file system instances. If {@code "fs.[prefix].impl.disable.cache"} is set
+ * to {@code true}, file system instances will be cached by Hadoop.
+ */
+public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactory {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** The default interval used to re-login from the key tab, in milliseconds. */
+ public static final long DFLT_RELOGIN_INTERVAL = 10 * 60 * 1000L;
+
+ /** Keytab full file name. */
+ private String keyTab;
+
+ /** Keytab principal. */
+ private String keyTabPrincipal;
+
+ /** The re-login interval. See {@link #getReloginInterval()} for more information. */
+ private long reloginInterval = DFLT_RELOGIN_INTERVAL;
+
+ /** Time of last re-login attempt, in system milliseconds. */
+ private transient volatile long lastReloginTime;
+
+ /**
+ * Constructor.
+ */
+ public KerberosHadoopFileSystemFactory() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystem get(String userName) throws IOException {
+ reloginIfNeeded();
+
+ return super.get(userName);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FileSystem create(String usrName) throws IOException, InterruptedException {
+ UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName,
+ UserGroupInformation.getLoginUser());
+
+ return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ @Override public FileSystem run() throws Exception {
+ return FileSystem.get(fullUri, cfg);
+ }
+ });
+ }
+
+ /**
+ * Gets the key tab principal short name (e.g. "hdfs").
+ *
+ * @return The key tab principal.
+ */
+ @Nullable public String getKeyTabPrincipal() {
+ return keyTabPrincipal;
+ }
+
+ /**
+ * Set the key tab principal name. See {@link #getKeyTabPrincipal()} for more information.
+ *
+ * @param keyTabPrincipal The key tab principal name.
+ */
+ public void setKeyTabPrincipal(@Nullable String keyTabPrincipal) {
+ this.keyTabPrincipal = keyTabPrincipal;
+ }
+
+ /**
+ * Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab").
+ *
+ * @return The key tab file name.
+ */
+ @Nullable public String getKeyTab() {
+ return keyTab;
+ }
+
+ /**
+ * Sets the key tab file name. See {@link #getKeyTab()} for more information.
+ *
+ * @param keyTab The key tab file name.
+ */
+ public void setKeyTab(@Nullable String keyTab) {
+ this.keyTab = keyTab;
+ }
+
+ /**
+ * The interval used to re-login from the key tab, in milliseconds.
+ * Important that the value should not be larger than the Kerberos ticket life time multiplied by 0.2. This is
+ * because the ticket renew window starts from {@code 0.8 * ticket life time}.
+ * Default ticket life time is 1 day (24 hours), so the default re-login interval (10 min)
+ * is obeys this rule well.
+ *
+ * <p>Zero value means that re-login should be attempted on each file system operation.
+ * Negative values are not allowed.
+ *
+ * <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to
+ * login if less than {@link org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} milliseconds
+ * have passed since the time of the previous login.
+ * See {@link org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} and its usages for
+ * more detail.
+ *
+ * @return The re-login interval, in milliseconds.
+ */
+ public long getReloginInterval() {
+ return reloginInterval;
+ }
+
+ /**
+ * Sets the relogin interval in milliseconds. See {@link #getReloginInterval()} for more information.
+ *
+ * @param reloginInterval The re-login interval, in milliseconds.
+ */
+ public void setReloginInterval(long reloginInterval) {
+ this.reloginInterval = reloginInterval;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty.");
+ A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be empty.");
+ A.ensure(reloginInterval >= 0, "reloginInterval cannot not be negative.");
+
+ super.start();
+
+ try {
+ UserGroupInformation.setConfiguration(cfg);
+ UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab);
+ }
+ catch (IOException ioe) {
+ throw new IgniteException("Failed login from keytab [keyTab=" + keyTab +
+ ", keyTabPrincipal=" + keyTabPrincipal + ']', ioe);
+ }
+ }
+
+ /**
+ * Re-logins the user if needed.
+ * First, the re-login interval defined in factory is checked. The re-login attempts will be not more
+ * frequent than one attempt per {@code reloginInterval}.
+ * Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} method invoked that gets existing
+ * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login.
+ *
+ * <p>This operation expected to be called upon each operation with the file system created with the factory.
+ * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there
+ * is no need to invoke it otherwise specially.
+ *
+ * @throws IOException If login fails.
+ */
+ private void reloginIfNeeded() throws IOException {
+ long now = System.currentTimeMillis();
+
+ if (now >= lastReloginTime + reloginInterval) {
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+
+ lastReloginTime = now;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ U.writeString(out, keyTab);
+ U.writeString(out, keyTabPrincipal);
+ out.writeLong(reloginInterval);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ keyTab = U.readString(in);
+ keyTabPrincipal = U.readString(in);
+ reloginInterval = in.readLong();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
new file mode 100644
index 0000000..8fb1612
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java
@@ -0,0 +1,104 @@
+package org.apache.ignite.hadoop.fs;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.Callable;
+
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ * Tests KerberosHadoopFileSystemFactory.
+ */
+public class KerberosHadoopFileSystemFactorySelfTest extends GridCommonAbstractTest {
+ /**
+ * Test parameters validation.
+ *
+ * @throws Exception If failed.
+ */
+ public void testParameters() throws Exception {
+ checkParameters(null, null, -1);
+
+ checkParameters(null, null, 100);
+ checkParameters(null, "b", -1);
+ checkParameters("a", null, -1);
+
+ checkParameters(null, "b", 100);
+ checkParameters("a", null, 100);
+ checkParameters("a", "b", -1);
+ }
+
+ /**
+ * Check parameters.
+ *
+ * @param keyTab Key tab.
+ * @param keyTabPrincipal Key tab principal.
+ * @param reloginInterval Re-login interval.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ private void checkParameters(String keyTab, String keyTabPrincipal, long reloginInterval) {
+ final KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
+
+ fac.setKeyTab(keyTab);
+ fac.setKeyTabPrincipal(keyTabPrincipal);
+ fac.setReloginInterval(reloginInterval);
+
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ fac.start();
+
+ return null;
+ }
+ }, IllegalArgumentException.class, null);
+ }
+
+ /**
+ * Checks serializatuion and deserialization of the secure factory.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSerialization() throws Exception {
+ KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
+
+ checkSerialization(fac);
+
+ fac = new KerberosHadoopFileSystemFactory();
+
+ fac.setUri("igfs://igfs@localhost:10500/");
+ fac.setConfigPaths("/a/core-sute.xml", "/b/mapred-site.xml");
+ fac.setKeyTabPrincipal("foo");
+ fac.setKeyTab("/etc/krb5.keytab");
+ fac.setReloginInterval(30 * 60 * 1000L);
+
+ checkSerialization(fac);
+ }
+
+ /**
+ * Serializes the factory,
+ *
+ * @param fac The facory to check.
+ * @throws Exception If failed.
+ */
+ private void checkSerialization(KerberosHadoopFileSystemFactory fac) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ ObjectOutput oo = new ObjectOutputStream(baos);
+
+ oo.writeObject(fac);
+
+ ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+
+ KerberosHadoopFileSystemFactory fac2 = (KerberosHadoopFileSystemFactory)in.readObject();
+
+ assertEquals(fac.getUri(), fac2.getUri());
+ Assert.assertArrayEquals(fac.getConfigPaths(), fac2.getConfigPaths());
+ assertEquals(fac.getKeyTab(), fac2.getKeyTab());
+ assertEquals(fac.getKeyTabPrincipal(), fac2.getKeyTabPrincipal());
+ assertEquals(fac.getReloginInterval(), fac2.getReloginInterval());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/008c8cd3/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 9092f32..acd255c 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.client.hadoop.HadoopClientProtocolEmbeddedSelfTest;
import org.apache.ignite.client.hadoop.HadoopClientProtocolSelfTest;
+import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest;
import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest;
import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest;
import org.apache.ignite.igfs.HadoopFIleSystemFactorySelfTest;
@@ -101,6 +102,8 @@ public class IgniteHadoopTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
+ suite.addTest(new TestSuite(ldr.loadClass(KerberosHadoopFileSystemFactorySelfTest.class.getName())));
+
suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName())));