You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by GitBox <gi...@apache.org> on 2018/11/15 04:41:15 UTC

[GitHub] kasakrisz closed pull request #14: AMBARI-24878 - Infra Manager: kerberos support

kasakrisz closed pull request #14: AMBARI-24878 - Infra Manager: kerberos support
URL: https://github.com/apache/ambari-infra/pull/14
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
index 0118c769..ddc4f000 100644
--- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
+++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
@@ -18,10 +18,16 @@
  */
 package org.apache.ambari.infra;
 
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -36,15 +42,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.commons.lang.StringUtils.isBlank;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 // TODO: use swagger
 public class InfraClient implements AutoCloseable {
@@ -96,6 +97,12 @@ public JobExecutionInfo startJob(String jobName, String parameters) {
     try {
       String responseText = execute(new HttpPost(uriBuilder.build())).getBody();
       Map<String, Object> responseContent = new ObjectMapper().readValue(responseText, new TypeReference<HashMap<String,Object>>() {});
+      if (!responseContent.containsKey("jobId"))
+        throw new NullPointerException("jobId is not found in start job responseContent");
+      if (!responseContent.containsKey("jobExecutionData"))
+        throw new NullPointerException("jobExecutionData is not found in start job responseContent");
+      if (!((Map)responseContent.get("jobExecutionData")).containsKey("id"))
+        throw new NullPointerException("id is not found in jobExecutionData");
       return new JobExecutionInfo(responseContent.get("jobId").toString(), ((Map)responseContent.get("jobExecutionData")).get("id").toString());
     } catch (URISyntaxException | JsonParseException | JsonMappingException e) {
       throw new RuntimeException(e);
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
index 6a36f724..5c783d6e 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
@@ -54,6 +54,8 @@
   @JsonSerialize(converter = FsPermissionToStringConverter.class)
   @JsonDeserialize(converter = StringToFsPermissionConverter.class)
   private FsPermission hdfsFilePermission;
+  private String hdfsKerberosPrincipal;
+  private String hdfsKerberosKeytabPath;
   private String start;
   private String end;
   @JsonSerialize(converter = DurationToStringConverter.class)
@@ -172,6 +174,22 @@ public void setHdfsFilePermission(FsPermission hdfsFilePermission) {
     this.hdfsFilePermission = hdfsFilePermission;
   }
 
+  public String getHdfsKerberosPrincipal() {
+    return hdfsKerberosPrincipal;
+  }
+
+  public void setHdfsKerberosPrincipal(String hdfsKerberosPrincipal) {
+    this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+  }
+
+  public String getHdfsKerberosKeytabPath() {
+    return hdfsKerberosKeytabPath;
+  }
+
+  public void setHdfsKerberosKeytabPath(String hdfsKerberosKeytabPath) {
+    this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+  }
+
   public Optional<S3Properties> s3Properties() {
     if (isBlank(s3BucketName))
       return Optional.empty();
@@ -183,6 +201,18 @@ public void setHdfsFilePermission(FsPermission hdfsFilePermission) {
             s3Endpoint));
   }
 
+  public Optional<HdfsProperties> hdfsProperties() {
+    if (isBlank(hdfsDestinationDirectory))
+      return Optional.empty();
+
+    return Optional.of(new HdfsProperties(
+            hdfsEndpoint,
+            hdfsDestinationDirectory,
+            hdfsFilePermission,
+            hdfsKerberosPrincipal,
+            hdfsKerberosKeytabPath));
+  }
+
   public String getStart() {
     return start;
   }
@@ -234,12 +264,9 @@ public void validate() {
         break;
 
       case HDFS:
-        if (isBlank(hdfsEndpoint))
-          throw new IllegalArgumentException(String.format(
-                  "The property hdfsEndpoint can not be null or empty string when destination is set to %s!", HDFS.name()));
-        if (isBlank(hdfsDestinationDirectory))
-          throw new IllegalArgumentException(String.format(
-                  "The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name()));
+        hdfsProperties()
+                .orElseThrow(() -> new IllegalArgumentException("HDFS related properties must be set if the destination is " + HDFS.name()))
+                .validate();
     }
 
     requireNonNull(solr, "No solr query was specified for archiving job!");
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
index 85fb364d..af522d3b 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
@@ -32,7 +32,6 @@
 import org.apache.ambari.infra.job.JobContextRepository;
 import org.apache.ambari.infra.job.JobScheduler;
 import org.apache.ambari.infra.job.ObjectSource;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.batch.core.Job;
@@ -103,8 +102,8 @@ public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
         break;
       case HDFS:
         org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-        conf.set("fs.defaultFS", parameters.getHdfsEndpoint());
-        fileAction.add(new HdfsUploader(conf, new Path(parameters.getHdfsDestinationDirectory()), parameters.getHdfsFilePermission()));
+        fileAction.add(new HdfsUploader(conf,
+                parameters.hdfsProperties().orElseThrow(() -> new IllegalStateException("HDFS properties are not provided!"))));
         break;
       case LOCAL:
         baseDir = new File(parameters.getLocalDestinationDirectory());
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
index a5735623..8ad576c4 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
@@ -23,7 +23,6 @@
 import static org.apache.commons.lang.StringUtils.isBlank;
 
 import java.time.Duration;
-import java.util.Optional;
 
 import org.apache.ambari.infra.job.JobProperties;
 import org.apache.ambari.infra.json.DurationToStringConverter;
@@ -40,6 +39,7 @@
   private String fileNameSuffixDateFormat;
   private Duration ttl;
   private SolrProperties solr;
+
   private String s3AccessFile;
   private String s3KeyPrefix;
   private String s3BucketName;
@@ -48,6 +48,8 @@
   private String hdfsEndpoint;
   private String hdfsDestinationDirectory;
   private FsPermission hdfsFilePermission;
+  private String hdfsKerberosPrincipal;
+  private String hdfsKerberosKeytabPath;
 
   public int getReadBlockSize() {
     return readBlockSize;
@@ -145,17 +147,6 @@ public void setS3Endpoint(String s3Endpoint) {
     this.s3Endpoint = s3Endpoint;
   }
 
-  public Optional<S3Properties> s3Properties() {
-    if (isBlank(s3BucketName))
-      return Optional.empty();
-
-    return Optional.of(new S3Properties(
-            s3AccessFile,
-            s3KeyPrefix,
-            s3BucketName,
-            s3Endpoint));
-  }
-
   public String getHdfsEndpoint() {
     return hdfsEndpoint;
   }
@@ -180,6 +171,22 @@ public void setHdfsDestinationDirectory(String hdfsDestinationDirectory) {
     this.hdfsDestinationDirectory = hdfsDestinationDirectory;
   }
 
+  public String getHdfsKerberosPrincipal() {
+    return hdfsKerberosPrincipal;
+  }
+
+  public void setHdfsKerberosPrincipal(String hdfsKerberosPrincipal) {
+    this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+  }
+
+  public String getHdfsKerberosKeytabPath() {
+    return hdfsKerberosKeytabPath;
+  }
+
+  public void setHdfsKerberosKeytabPath(String hdfsKerberosKeytabPath) {
+    this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+  }
+
   private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) {
     String valueText = jobParameters.getString(parameterName);
     if (isBlank(valueText))
@@ -203,6 +210,8 @@ public ArchivingParameters merge(JobParameters jobParameters) {
     archivingParameters.setHdfsEndpoint(jobParameters.getString("hdfsEndpoint", hdfsEndpoint));
     archivingParameters.setHdfsDestinationDirectory(jobParameters.getString("hdfsDestinationDirectory", hdfsDestinationDirectory));
     archivingParameters.setHdfsFilePermission(toFsPermission(jobParameters.getString("hdfsFilePermission", FsPermissionToStringConverter.toString(hdfsFilePermission))));
+    archivingParameters.setHdfsKerberosPrincipal(jobParameters.getString("hdfsKerberosPrincipal", hdfsKerberosPrincipal));
+    archivingParameters.setHdfsKerberosKeytabPath(jobParameters.getString("hdfsKerberosKeytabPath", hdfsKerberosKeytabPath));
     archivingParameters.setSolr(solr.merge(jobParameters));
     archivingParameters.setStart(jobParameters.getString("start"));
     archivingParameters.setEnd(jobParameters.getString("end"));
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java
new file mode 100644
index 00000000..da4137fe
--- /dev/null
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+
+import org.apache.hadoop.fs.permission.FsPermission;
+
+public class HdfsProperties {
+  private static final String DEFAULT_FILE_PERMISSION = "640";
+
+  private final String hdfsEndpoint;
+  private final String hdfsDestinationDirectory;
+  private final FsPermission hdfsFilePermission;
+  private final String hdfsKerberosPrincipal;
+  private final String hdfsKerberosKeytabPath;
+
+  public HdfsProperties(String hdfsEndpoint, String hdfsDestinationDirectory, FsPermission hdfsFilePermission, String hdfsKerberosPrincipal, String hdfsKerberosKeytabPath) {
+    this.hdfsEndpoint = hdfsEndpoint;
+    this.hdfsDestinationDirectory = hdfsDestinationDirectory;
+    this.hdfsFilePermission = hdfsFilePermission == null ? new FsPermission(DEFAULT_FILE_PERMISSION) : hdfsFilePermission;
+    this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+    this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+  }
+
+  public String getHdfsEndpoint() {
+    return hdfsEndpoint;
+  }
+
+  public String getHdfsDestinationDirectory() {
+    return hdfsDestinationDirectory;
+  }
+
+  public FsPermission getHdfsFilePermission() {
+    return hdfsFilePermission;
+  }
+
+  public String getHdfsKerberosPrincipal() {
+    return hdfsKerberosPrincipal;
+  }
+
+  public String getHdfsKerberosKeytabPath() {
+    return hdfsKerberosKeytabPath;
+  }
+
+  @Override
+  public String toString() {
+    return "HdfsProperties{" +
+            "hdfsEndpoint='" + hdfsEndpoint + '\'' +
+            ", hdfsDestinationDirectory='" + hdfsDestinationDirectory + '\'' +
+            ", hdfsFilePermission=" + hdfsFilePermission +
+            ", hdfsKerberosPrincipal='" + hdfsKerberosPrincipal + '\'' +
+            ", hdfsKerberosKeytabPath='" + hdfsKerberosKeytabPath + '\'' +
+            '}';
+  }
+
+  public void validate() {
+    if (isBlank(hdfsDestinationDirectory))
+      throw new IllegalArgumentException("The property hdfsDestinationDirectory can not be null or empty string!");
+
+    if (isNotBlank(hdfsKerberosPrincipal) && isBlank(hdfsKerberosKeytabPath))
+      throw new IllegalArgumentException("The property hdfsKerberosPrincipal is specified but hdfsKerberosKeytabPath is blank!");
+
+    if (isBlank(hdfsKerberosPrincipal) && isNotBlank(hdfsKerberosKeytabPath))
+      throw new IllegalArgumentException("The property hdfsKerberosKeytabPath is specified but hdfsKerberosPrincipal is blank!");
+  }
+}
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
index 469326fb..ff486738 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.ambari.infra.job.archive;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -25,31 +27,59 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
 
 public class HdfsUploader extends AbstractFileAction {
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsUploader.class);
 
-  private static final String DEFAULT_FILE_PERMISSION = "640";
   private final Configuration configuration;
-  private final Path destinationDirectory;
-  private final FsPermission fsPermission;
+  private final HdfsProperties properties;
 
-  public HdfsUploader(Configuration configuration, Path destinationDirectory, FsPermission fsPermission) {
-    this.destinationDirectory = destinationDirectory;
+  public HdfsUploader(Configuration configuration, HdfsProperties properties) {
+    this.properties = properties;
     this.configuration = configuration;
-    this.fsPermission = fsPermission == null ? new FsPermission(DEFAULT_FILE_PERMISSION) : fsPermission;
+
+    if (new ClassPathResource("core-site.xml").exists()) {
+      LOG.info("Hdfs core-site.xml is found in the classpath.");
+    }
+    else {
+      LOG.warn("Hdfs core-site.xml is not found in the classpath. Using defaults.");
+    }
+    if (new ClassPathResource("hdfs-site.xml").exists()) {
+      LOG.info("Hdfs hdfs-site.xml is found in the classpath.");
+    }
+    else {
+      LOG.warn("Hdfs hdfs-site.xml is not found in the classpath. Using defaults.");
+    }
+    if (isNotBlank(properties.getHdfsEndpoint())) {
+      LOG.info("Hdfs endpoint is defined in Infra Manager properties. Setting fs.defaultFS to {}", properties.getHdfsEndpoint());
+      this.configuration.set("fs.defaultFS", properties.getHdfsEndpoint());
+    }
+
+    UserGroupInformation.setConfiguration(configuration);
   }
 
   @Override
   protected File onPerform(File inputFile) {
+    try {
+      if ("kerberos".equalsIgnoreCase(configuration.get("hadoop.security.authentication")))
+        UserGroupInformation.loginUserFromKeytab(properties.getHdfsKerberosPrincipal(), properties.getHdfsKerberosKeytabPath());
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
     try (FileSystem fileSystem = FileSystem.get(configuration)) {
-      Path destination = new Path(destinationDirectory, inputFile.getName());
+
+      Path destination = new Path(properties.getHdfsDestinationDirectory(), inputFile.getName());
       if (fileSystem.exists(destination)) {
         throw new UnsupportedOperationException(String.format("File '%s' already exists!", destination));
       }
 
       fileSystem.copyFromLocalFile(new Path(inputFile.getAbsolutePath()), destination);
-      fileSystem.setPermission(destination, fsPermission);
+      fileSystem.setPermission(destination, properties.getHdfsFilePermission());
 
       return inputFile;
     }
diff --git a/pom.xml b/pom.xml
index 29271c16..b6f52f9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,7 +31,7 @@
     <deb.python.ver>python (&gt;= 2.6)</deb.python.ver>
     <deb.architecture>amd64</deb.architecture>
     <deb.dependency.list>${deb.python.ver}</deb.dependency.list>
-    <hadoop.version>3.0.0</hadoop.version>
+    <hadoop.version>3.1.1</hadoop.version>
     <surefire.argLine>-Xmx1024m -Xms512m</surefire.argLine>
     <zookeeper.version>3.4.6.2.3.0.0-2557</zookeeper.version>
     <ambari-metrics.version>2.7.0.0.0</ambari-metrics.version>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services