You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:26:00 UTC
[14/51] [partial] falcon git commit: FALCON-1830 Removed code source
directories and updated pom
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java
deleted file mode 100644
index 499cab9..0000000
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression.core.util;
-
-import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
-import org.apache.falcon.regression.core.response.lineage.Edge;
-import org.apache.falcon.regression.core.response.lineage.EdgesResult;
-import org.apache.falcon.regression.core.response.lineage.GraphResult;
-import org.apache.falcon.regression.core.response.lineage.NODE_TYPE;
-import org.apache.falcon.regression.core.response.lineage.Vertex;
-import org.apache.falcon.regression.core.response.lineage.VerticesResult;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-
-/**
- * util methods for Graph Asserts.
- */
-public final class GraphAssert {
- private GraphAssert() {
- throw new AssertionError("Instantiating utility class...");
- }
- private static final Logger LOGGER = Logger.getLogger(GraphAssert.class);
-
- /**
- * Check that the result has certain minimum number of vertices.
- * @param graphResult the result to be checked
- * @param minNumOfVertices required number of vertices
- */
- public static void checkVerticesPresence(final GraphResult graphResult,
- final int minNumOfVertices) {
- Assert.assertTrue(graphResult.getTotalSize() >= minNumOfVertices,
- "graphResult should have at least " + minNumOfVertices + " vertex");
- }
-
- /**
- * Check that the vertices in the result are sane.
- * @param verticesResult the result to be checked
- */
- public static void assertVertexSanity(final VerticesResult verticesResult) {
- Assert.assertEquals(verticesResult.getResults().size(), verticesResult.getTotalSize(),
- "Size of vertices don't match");
- for (Vertex vertex : verticesResult.getResults()) {
- Assert.assertNotNull(vertex.getId(),
- "id of the vertex should be non-null: " + vertex);
- Assert.assertEquals(vertex.getNodeType(), NODE_TYPE.VERTEX,
- "_type of the vertex should be non-null: " + vertex);
- Assert.assertNotNull(vertex.getName(),
- "name of the vertex should be non-null: " + vertex);
- Assert.assertNotNull(vertex.getType(),
- "type of the vertex should be non-null: " + vertex);
- Assert.assertNotNull(vertex.getTimestamp(),
- "timestamp of the vertex should be non-null: " + vertex);
- }
- }
-
- /**
- * Check that edges in the result are sane.
- * @param edgesResult result to be checked
- */
- public static void assertEdgeSanity(final EdgesResult edgesResult) {
- Assert.assertEquals(edgesResult.getResults().size(), edgesResult.getTotalSize(),
- "Size of edges don't match");
- for (Edge edge : edgesResult.getResults()) {
- assertEdgeSanity(edge);
- }
- }
-
- /**
- * Check that edge is sane.
- * @param edge edge to be checked
- */
- public static void assertEdgeSanity(Edge edge) {
- Assert.assertNotNull(edge.getId(), "id of an edge can't be null: " + edge);
- Assert.assertEquals(edge.getNodeType(), NODE_TYPE.EDGE,
- "_type of an edge can't be null: " + edge);
- Assert.assertNotNull(edge.getLabel(), "_label of an edge can't be null: " + edge);
- Assert.assertTrue(edge.getInV() > 0, "_inV of an edge can't be null: " + edge);
- Assert.assertTrue(edge.getOutV() > 0, "_outV of an edge can't be null: " + edge);
- }
-
- /**
- * Check that user vertex is present.
- * @param verticesResult the result to be checked
- */
- public static void assertUserVertexPresence(final VerticesResult verticesResult) {
- checkVerticesPresence(verticesResult, 1);
- for(Vertex vertex : verticesResult.getResults()) {
- if (vertex.getType() == Vertex.VERTEX_TYPE.USER
- && vertex.getName().equals(MerlinConstants.CURRENT_USER_NAME)) {
- return;
- }
- }
- Assert.fail(String.format("Vertex corresponding to user: %s is not present.",
- MerlinConstants.CURRENT_USER_NAME));
- }
-
- /**
- * Check that a vertex of a certain name is present.
- * @param verticesResult the result to be checked
- * @param name expected name
- */
- public static void assertVertexPresence(final VerticesResult verticesResult, final String name) {
- checkVerticesPresence(verticesResult, 1);
- for (Vertex vertex : verticesResult.getResults()) {
- if (vertex.getName().equals(name)) {
- return;
- }
- }
- Assert.fail(String.format("Vertex of name: %s is not present.", name));
- }
-
- /**
- * Check that the result has at least a certain number of vertices of a certain type.
- * @param verticesResult the result to be checked
- * @param vertexType vertex type
- * @param minOccurrence required number of vertices
- */
- public static void assertVerticesPresenceMinOccur(final VerticesResult verticesResult,
- final Vertex.VERTEX_TYPE vertexType,
- final int minOccurrence) {
- int occurrence = 0;
- for(Vertex vertex : verticesResult.getResults()) {
- if (vertex.getType() == vertexType) {
- LOGGER.info("Found vertex: " + vertex);
- occurrence++;
- if (occurrence >= minOccurrence) {
- return;
- }
- }
- }
- Assert.fail(String.format("Expected at least %d vertices of type %s. But found only %d",
- minOccurrence, vertexType, occurrence));
- }
-
- /**
- * Check result to contain at least a certain number of edges of a certain type.
- * @param edgesResult result to be checked
- * @param edgeLabel edge label
- * @param minOccurrence required number of edges
- */
- public static void assertEdgePresenceMinOccur(final EdgesResult edgesResult,
- final Edge.LabelType edgeLabel,
- final int minOccurrence) {
- int occurrence = 0;
- for(Edge edge : edgesResult.getResults()) {
- if (edge.getLabel() == edgeLabel) {
- LOGGER.info("Found edge: " + edge);
- occurrence++;
- if (occurrence >= minOccurrence) {
- return;
- }
- }
- }
- Assert.fail(String.format("Expected at least %d vertices of type %s. But found only %d",
- minOccurrence, edgeLabel, occurrence));
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java
deleted file mode 100644
index 1b463cd..0000000
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression.core.util;
-
-
-import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.hcatalog.api.HCatAddPartitionDesc;
-import org.apache.hive.hcatalog.api.HCatClient;
-import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.testng.Assert;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * util methods for HCat.
- */
-public final class HCatUtil {
- private HCatUtil() {
- throw new AssertionError("Instantiating utility class...");
- }
-
- public static HCatClient getHCatClient(String hCatEndPoint, String hiveMetaStorePrinciple)
- throws HCatException {
- HiveConf hcatConf = new HiveConf();
- hcatConf.set("hive.metastore.local", "false");
- hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, hCatEndPoint);
- hcatConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, hiveMetaStorePrinciple);
- hcatConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, MerlinConstants.IS_SECURE);
- hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
- hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
- HCatSemanticAnalyzer.class.getName());
- hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
- hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- return HCatClient.create(hcatConf);
- }
-
- public static void addPartitionsToTable(HCatClient clusterHC, List<String> partitions,
- List<String> partitionLocations, String partitionCol, String dbName, String tableName) throws HCatException {
- Assert.assertEquals(partitions.size(), partitionLocations.size(),
- "Number of locations is not same as number of partitions.");
- final List<HCatAddPartitionDesc> partitionDesc = new ArrayList<>();
- for (int i = 0; i < partitions.size(); ++i) {
- final String partition = partitions.get(i);
- final Map<String, String> onePartition = new HashMap<>();
- onePartition.put(partitionCol, partition);
- final String partitionLoc = partitionLocations.get(i);
- partitionDesc.add(HCatAddPartitionDesc.create(dbName, tableName, partitionLoc, onePartition).build());
- }
- clusterHC.addPartitions(partitionDesc);
- }
-
- @SuppressWarnings("deprecation")
- public static HCatFieldSchema getStringSchema(String fieldName, String comment) throws HCatException {
- return new HCatFieldSchema(fieldName, HCatFieldSchema.Type.STRING, comment);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
deleted file mode 100644
index a3b059e..0000000
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
+++ /dev/null
@@ -1,569 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression.core.util;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.log4j.Logger;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import java.util.regex.Pattern;
-
-/**
- * Util methods related to hadoop.
- */
-public final class HadoopUtil {
-
- public static final String SOMETHING_RANDOM = "somethingRandom";
- private static final Logger LOGGER = Logger.getLogger(HadoopUtil.class);
- private static Pattern protocol = Pattern.compile(":[\\d]+/");
-
- private HadoopUtil() {
- throw new AssertionError("Instantiating utility class...");
- }
-
- /*
- * Removes 'hdfs(hftp)://server:port'
- */
- public static String cutProtocol(String path) {
- if (StringUtils.isNotEmpty(path)) {
- if (protocol.matcher(path).find()) {
- return '/' + protocol.split(path)[1];
- }
- }
- return path;
- }
-
- public static String joinPath(String basePath, String... restParts) {
- final String separator = "/";
- List<String> cleanParts = new ArrayList<>();
- String cleanBasePath = basePath.replaceFirst(separator + "$", "");
- cleanParts.add(cleanBasePath);
- for (String onePart : restParts) {
- final String cleanPart = onePart.replaceFirst("^" + separator, "").replaceFirst(separator + "$", "");
- cleanParts.add(cleanPart);
- }
- return StringUtils.join(cleanParts, separator);
- }
-
- /**
- * Retrieves all file names contained in a given directory.
- * @param fs filesystem
- * @param location given directory
- * @return list of file names
- * @throws IOException
- */
- public static List<String> getAllFilesHDFS(FileSystem fs, Path location) throws IOException {
- List<String> files = new ArrayList<>();
- if (!fs.exists(location)) {
- return files;
- }
- FileStatus[] stats = fs.listStatus(location);
- for (FileStatus stat : stats) {
- if (!isDir(stat)) {
- files.add(stat.getPath().toString());
- }
- }
- return files;
- }
-
- /**
- * Retrieves all directories withing a given depth starting from a specific dir.
- * @param fs filesystem
- * @param location given dir
- * @param depth depth
- * @return all matching directories
- * @throws IOException
- */
- public static List<Path> getAllDirsRecursivelyHDFS(
- FileSystem fs, Path location, int depth) throws IOException {
- List<Path> returnList = new ArrayList<>();
- FileStatus[] stats = fs.listStatus(location);
- for (FileStatus stat : stats) {
- if (isDir(stat)) {
- returnList.add(stat.getPath());
- if (depth > 0) {
- returnList.addAll(getAllDirsRecursivelyHDFS(fs, stat.getPath(), depth - 1));
- }
- }
- }
- return returnList;
- }
-
- /**
- * Recursively retrieves all data file names from a given location.
- * @param fs filesystem
- * @param location given location
- * @return list of all files
- * @throws IOException
- */
- public static List<Path> getAllFilesRecursivelyHDFS(
- FileSystem fs, Path location) throws IOException {
- List<Path> returnList = new ArrayList<>();
- RemoteIterator<LocatedFileStatus> remoteIterator;
- try {
- remoteIterator = fs.listFiles(location, true);
- } catch (FileNotFoundException e) {
- LOGGER.info("Path '" + location + "' is not found on " + fs.getUri());
- return returnList;
- }
- while(remoteIterator.hasNext()) {
- Path path = remoteIterator.next().getPath();
- if (!path.toUri().toString().contains("_SUCCESS")) {
- returnList.add(path);
- }
- }
- return returnList;
- }
-
- /**
- * Recursively retrieves all data file names from a given location and looks for presence of availabilityFlag.
- * If availabilityFlag is null then it looks for _SUCCESS file(set as default).
- * @param fs filesystem
- * @param location given location
- * @param availabilityFlag value of availability flag set in entity
- * @return
- * @throws IOException
- */
- public static boolean getSuccessFolder(
- FileSystem fs, Path location, String availabilityFlag) throws IOException {
- LOGGER.info("location : " + location);
- for(FileStatus stat : fs.listStatus(location)) {
- if (availabilityFlag.isEmpty()) {
- if (stat.getPath().getName().equals("_SUCCESS")) {
- return true;
- }
- } else {
- if (stat.getPath().getName().equals(availabilityFlag)) {
- return true;
- }
- }
- }
- return false;
- }
-
- @SuppressWarnings("deprecation")
- private static boolean isDir(FileStatus stat) {
- return stat.isDir();
- }
-
- /**
- * Copies file from local place to hdfs location.
- * @param fs target filesystem
- * @param dstHdfsDir destination
- * @param srcFileLocation source location
- * @throws IOException
- */
- public static void copyDataToFolder(final FileSystem fs, String dstHdfsDir,
- final String srcFileLocation)
- throws IOException {
- LOGGER.info(String.format("Copying local dir %s to hdfs location %s on %s",
- srcFileLocation, dstHdfsDir, fs.getUri()));
- fs.copyFromLocalFile(new Path(srcFileLocation), new Path(cutProtocol(dstHdfsDir)));
- }
-
- /**
- * Copies a whole directory to hdfs.
- * @param fs target filesystem
- * @param dstHdfsDir destination dir
- * @param localLocation source location
- * @throws IOException
- */
- public static void uploadDir(final FileSystem fs, final String dstHdfsDir,
- final String localLocation)
- throws IOException {
- LOGGER.info(String.format("Uploading local dir %s to hdfs location %s", localLocation,
- dstHdfsDir));
- HadoopUtil.deleteDirIfExists(dstHdfsDir, fs);
- HadoopUtil.copyDataToFolder(fs, dstHdfsDir, localLocation);
- }
-
- /**
- * Copies given data to hdfs location.
- * @param fs target filesystem
- * @param dstHdfsDir destination dir
- * @param data source location
- * @param overwrite do we want to overwrite the data
- * @throws IOException
- */
- public static void writeDataForHive(final FileSystem fs, final String dstHdfsDir,
- final CharSequence data, boolean overwrite) throws IOException {
- LOGGER.info(String.format("Writing data %s to hdfs location %s", data, dstHdfsDir));
- final File tempFile = File.createTempFile(UUID.randomUUID().toString().split("-")[0], ".dat");
- FileUtils.write(tempFile, data);
- if (overwrite) {
- HadoopUtil.deleteDirIfExists(dstHdfsDir, fs);
- }
- try {
- fs.mkdirs(new Path(dstHdfsDir));
- } catch (Exception e) {
- //ignore
- }
- fs.setPermission(new Path(dstHdfsDir), FsPermission.getDirDefault());
- HadoopUtil.copyDataToFolder(fs, dstHdfsDir, tempFile.getAbsolutePath());
- if (!tempFile.delete()) {
- LOGGER.warn("Deletion of " + tempFile + " failed.");
- }
- }
-
- /**
- * Lists names of given directory subfolders.
- * @param fs filesystem
- * @param baseDir given directory
- * @return list of subfolders
- * @throws IOException
- */
- public static List<String> getHDFSSubFoldersName(FileSystem fs,
- String baseDir) throws IOException {
- List<String> returnList = new ArrayList<>();
- FileStatus[] stats = fs.listStatus(new Path(baseDir));
- for (FileStatus stat : stats) {
- if (isDir(stat)) {
- returnList.add(stat.getPath().getName());
- }
- }
- return returnList;
- }
-
- /**
- * Checks if file is present in given directory.
- * @param fs filesystem
- * @param hdfsPath path to a given directory
- * @param fileToCheckFor file
- * @return either file present or not
- * @throws IOException
- */
- public static boolean isFilePresentHDFS(FileSystem fs, String hdfsPath, String fileToCheckFor)
- throws IOException {
- LOGGER.info("getting file from folder: " + hdfsPath);
- List<String> fileNames = getAllFileNamesFromHDFS(fs, hdfsPath);
- for (String filePath : fileNames) {
- if (filePath.contains(fileToCheckFor)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * Lists all file names for a given directory.
- * @param fs filesystem
- * @param hdfsPath path to a given directory
- * @return list of files which given directory contains
- * @throws IOException
- */
- private static List<String> getAllFileNamesFromHDFS(
- FileSystem fs, String hdfsPath) throws IOException {
- List<String> returnList = new ArrayList<>();
- LOGGER.info("getting file from folder: " + hdfsPath);
- FileStatus[] stats = fs.listStatus(new Path(hdfsPath));
- for (FileStatus stat : stats) {
- String currentPath = stat.getPath().toUri().getPath(); // gives directory name
- if (!isDir(stat)) {
- returnList.add(currentPath);
- }
- }
- return returnList;
- }
-
- /**
- * Removes directory with a given name and creates empty one with the same name.
- * @param fs filesystem
- * @param path path to a directory
- * @throws IOException
- */
- public static void recreateDir(FileSystem fs, String path) throws IOException {
- deleteDirIfExists(path, fs);
- LOGGER.info("creating hdfs dir: " + path + " on " + fs.getConf().get("fs.default.name"));
- fs.mkdirs(new Path(path));
- }
-
- /**
- * Recreates dirs for a list of filesystems.
- * @param fileSystems list of filesystems
- * @param path path to a directory
- * @throws IOException
- */
- public static void recreateDir(List<FileSystem> fileSystems, String path) throws IOException {
- for (FileSystem fs : fileSystems) {
- recreateDir(fs, path);
- }
- }
-
- /**
- * Removes given directory from a filesystem.
- * @param hdfsPath path to a given directory
- * @param fs filesystem
- * @throws IOException
- */
- public static void deleteDirIfExists(String hdfsPath, FileSystem fs) throws IOException {
- Path path = new Path(hdfsPath);
- if (fs.exists(path)) {
- LOGGER.info(String.format("Deleting HDFS path: %s on %s", path, fs.getUri()));
- fs.delete(path, true);
- } else {
- LOGGER.info(String.format(
- "Not deleting non-existing HDFS path: %s on %s", path, fs.getUri()));
- }
- }
-
- /**
- * Copies data in folders without prefix.
- * @param fs filesystem
- * @param inputPath source location
- * @param remoteLocations destination location
- * @throws IOException
- */
- public static void flattenAndPutDataInFolder(FileSystem fs, String inputPath,
- List<String> remoteLocations) throws IOException {
- flattenAndPutDataInFolder(fs, inputPath, "", remoteLocations);
- }
-
- /**
- * Copies files from a source directory to target directories on hdfs.
- * @param fs target filesystem
- * @param inputPath source location
- * @param remotePathPrefix prefix for target directories
- * @param remoteLocations target directories
- * @return list of exact locations where data was copied
- * @throws IOException
- */
- public static List<String> flattenAndPutDataInFolder(FileSystem fs, String inputPath,
- String remotePathPrefix,
- List<String> remoteLocations) throws IOException {
- if (StringUtils.isNotEmpty(remotePathPrefix)) {
- deleteDirIfExists(remotePathPrefix, fs);
- }
- LOGGER.info("Creating data in folders: \n" + remoteLocations);
- File input = new File(inputPath);
- File[] files = input.isDirectory() ? input.listFiles() : new File[]{input};
- List<Path> filePaths = new ArrayList<>();
- assert files != null;
- for (final File file : files) {
- if (!file.isDirectory()) {
- final Path filePath = new Path(file.getAbsolutePath());
- filePaths.add(filePath);
- }
- }
- if (!remotePathPrefix.endsWith("/") && !remoteLocations.get(0).startsWith("/")) {
- remotePathPrefix += "/";
- }
- List<String> locations = new ArrayList<>();
- for (String remoteDir : remoteLocations) {
- String remoteLocation = remotePathPrefix + remoteDir;
- remoteLocation = cutProtocol(remoteLocation);
- locations.add(remoteLocation);
- LOGGER.info(String.format("copying to: %s files: %s",
- fs.getUri() + remoteLocation, Arrays.toString(files)));
- if (!fs.exists(new Path(remoteLocation))) {
- fs.mkdirs(new Path(remoteLocation));
- }
- fs.copyFromLocalFile(false, true, filePaths.toArray(new Path[filePaths.size()]),
- new Path(remoteLocation));
- }
- return locations;
- }
-
- /**
- * Copies data from local sources to remote directories.
- * @param fs target filesystem
- * @param folderPrefix prefix for remote directories
- * @param folderList remote directories
- * @param fileLocations sources
- * @throws IOException
- */
- public static void copyDataToFolders(FileSystem fs, final String folderPrefix,
- List<String> folderList, String... fileLocations) throws IOException {
- for (final String folder : folderList) {
- String folderSpace = folder.replaceAll("/", "_");
- File file = new File(OSUtil.NORMAL_INPUT + folderSpace + ".txt");
- FileUtils.writeStringToFile(file, "folder", true);
- fs.copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(folderPrefix + folder));
- if (!file.delete()) {
- LOGGER.info("delete was not successful for file: " + file);
- }
- Path[] srcPaths = new Path[fileLocations.length];
- for (int i = 0; i < srcPaths.length; ++i) {
- srcPaths[i] = new Path(fileLocations[i]);
- }
- LOGGER.info(String.format("copying %s to %s%s on %s", Arrays.toString(srcPaths),
- folderPrefix, folder, fs.getUri()));
- fs.copyFromLocalFile(false, true, srcPaths, new Path(folderPrefix + folder));
- }
- }
-
- /**
- * Uploads data to remote directories with names within date ranges.
- * @param fs target filesystem
- * @param interval dates ranges before and after current date
- * @param minuteSkip time to skip within a range to get intermediate directories
- * @param folderPrefix prefix for remote directories
- * @throws IOException
- */
- public static void lateDataReplenish(FileSystem fs, int interval,
- int minuteSkip, String folderPrefix) throws IOException {
- List<String> folderData = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
- folderData.add(SOMETHING_RANDOM);
- flattenAndPutDataInFolder(fs, OSUtil.NORMAL_INPUT, folderPrefix, folderData);
- }
-
- /**
- * Creates list of folders on remote filesystem.
- * @param fs remote filesystem
- * @param folderPrefix prefix for remote directories
- * @param folderList list of folders
- * @throws IOException
- */
- public static void createFolders(FileSystem fs, final String folderPrefix,
- List<String> folderList) throws IOException {
- for (final String folder : folderList) {
- final String pathString = cutProtocol(folderPrefix + folder);
- LOGGER.info("Creating " + fs.getUri() + "/" + pathString);
- fs.mkdirs(new Path(pathString));
- }
- }
-
- /**
- * Created folders in remote location according to current time and copies files here.
- * @param fs target filesystem
- * @param remoteLocation remote location
- * @param localLocation source
- * @throws IOException
- */
- public static void injectMoreData(FileSystem fs, final String remoteLocation,
- String localLocation) throws IOException {
- File[] files = new File(localLocation).listFiles();
- assert files != null;
- for (final File file : files) {
- if (!file.isDirectory()) {
- String path = remoteLocation + "/" + System.currentTimeMillis() / 1000 + "/";
- LOGGER.info("inserting data@ " + path);
- fs.copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(path));
- }
- }
-
- }
-
- /**
- * Uploads either _SUCCESS or dataFile4.txt file to remote directories with names within date
- * ranges.
- * @param fs target filesystem
- * @param interval dates ranges before and after current date
- * @param minuteSkip time to skip within a range to get intermediate directories
- * @param folderPrefix prefix for remote directories
- * @param fileToBePut what file to copy to remote locations
- * @throws IOException
- */
- public static void putFileInFolderHDFS(FileSystem fs, int interval, int minuteSkip,
- String folderPrefix, String fileToBePut)
- throws IOException {
- List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
- LOGGER.info("folderData: " + folderPaths.toString());
- createFolders(fs, folderPrefix, folderPaths);
- if (fileToBePut.equals("_SUCCESS")) {
- copyDataToFolders(fs, folderPrefix, folderPaths, OSUtil.concat(OSUtil.NORMAL_INPUT, "_SUCCESS"));
- } else {
- copyDataToFolders(fs, folderPrefix, folderPaths, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile4.txt"));
- }
- }
-
- /**
- * Uploads dataFile4.txt file to remote directories with names within date ranges.
- * @param fs target filesystem
- * @param interval dates ranges before and after current date
- * @param minuteSkip time to skip within a range to get intermediate directories
- * @param folderPrefix prefix for remote directories
- * @param postFix postfix for remote locations
- * @throws IOException
- */
- public static void lateDataReplenishWithoutSuccess(FileSystem fs, int interval,
- int minuteSkip, String folderPrefix, String postFix) throws IOException {
- List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
- LOGGER.info("folderData: " + folderPaths.toString());
- if (postFix != null) {
- for (int i = 0; i < folderPaths.size(); i++) {
- folderPaths.set(i, folderPaths.get(i) + postFix);
- }
- }
- createFolders(fs, folderPrefix, folderPaths);
- copyDataToFolders(fs, folderPrefix, folderPaths, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile4.txt"));
- }
-
- /**
- * Uploads both dataFile4.txt and _SUCCESS files to remote directories with names within date
- * ranges.
- * @param fs target filesystem
- * @param interval dates ranges before and after current date
- * @param minuteSkip time to skip within a range to get intermediate directories
- * @param folderPrefix prefix for remote directories
- * @param postFix postfix for remote locations
- * @throws IOException
- */
- public static void lateDataReplenish(FileSystem fs, int interval, int minuteSkip,
- String folderPrefix, String postFix) throws IOException {
- List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
- LOGGER.info("folderData: " + folderPaths.toString());
- if (postFix != null) {
- for (int i = 0; i < folderPaths.size(); i++) {
- folderPaths.set(i, folderPaths.get(i) + postFix);
- }
- }
- createFolders(fs, folderPrefix, folderPaths);
- copyDataToFolders(fs, folderPrefix, folderPaths,
- OSUtil.concat(OSUtil.NORMAL_INPUT, "_SUCCESS"),
- OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile4.txt"));
- }
-
- /**
- * Creates empty folders in hdfs.
- * @param helper target
- * @param folderList list of folders
- * @throws IOException
- * @deprecated method creates filesystem object by itself. We should pass existing FileSystem
- * object to such methods.
- */
- @Deprecated
- public static void createHDFSFolders(ColoHelper helper, List<String> folderList)
- throws IOException {
- LOGGER.info("creating folders.....");
- Configuration conf = new Configuration();
- conf.set("fs.default.name", "hdfs://" + helper.getFeedHelper().getHadoopURL());
- final FileSystem fs = FileSystem.get(conf);
- for (final String folder : folderList) {
- if (StringUtils.isNotEmpty(folder)) {
- fs.mkdirs(new Path(cutProtocol(folder)));
- }
- }
- LOGGER.info("created folders.....");
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HiveAssert.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HiveAssert.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HiveAssert.java
deleted file mode 100644
index 2a934b5..0000000
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HiveAssert.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression.core.util;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.falcon.regression.core.helpers.ColoHelper;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hive.hcatalog.api.HCatClient;
-import org.apache.hive.hcatalog.api.HCatDatabase;
-import org.apache.hive.hcatalog.api.HCatPartition;
-import org.apache.hive.hcatalog.api.HCatTable;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.log4j.Logger;
-import org.testng.asserts.SoftAssert;
-
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/** Assertions for to Hive objects. */
-public final class HiveAssert {
- private HiveAssert() {
- throw new AssertionError("Instantiating utility class...");
- }
-
- private static final Logger LOGGER = Logger.getLogger(HiveAssert.class);
-
- /**
- * Assertion for column equality - it also covers stuff that is not covered by
- * HCatFieldSchema.equals().
- * @param columns1 first column for comparison
- * @param columns2 second column for comparison
- * @param softAssert object to use for performing assertion
- * @return object used for performing assertion
- */
- public static SoftAssert assertColumnListEqual(List<HCatFieldSchema> columns1,
- List<HCatFieldSchema> columns2,
- SoftAssert softAssert) {
- softAssert.assertEquals(columns1, columns2, "List of columns for two tables are not same");
- for (int i = 0; i < columns1.size(); ++i) {
- HCatFieldSchema column1 = columns1.get(i);
- HCatFieldSchema column2 = columns2.get(i);
- softAssert.assertEquals(column2.getComment(), column1.getComment(),
- "Comments of the columns: " + column1 + " & " + column2 + " is not same");
- }
- return softAssert;
- }
-
- /**
- * Assertion for equality of partitions - equality using HCatPartition.equals() is not
- * satisfactory for our purpose.
- * @param table1Partitions first list of partitions for comparison
- * @param table2Partitions second list of partitions for comparison
- * @param softAssert object to use for performing assertion
- * @return object used for performing assertion
- */
- public static SoftAssert assertPartitionListEqual(List<HCatPartition> table1Partitions,
- List<HCatPartition> table2Partitions, SoftAssert softAssert) {
- softAssert.assertEquals(table1Partitions.size(), table2Partitions.size(),
- "Number of partitions are not same");
- try {
- for (int i = 0; i < table1Partitions.size(); i++) {
- final HCatPartition table1Partition = table1Partitions.get(i);
- final HCatPartition table2Partition = table2Partitions.get(i);
- softAssert.assertEquals(table2Partition.getValues(), table1Partition.getValues(),
- "Partitions don't have same values");
- }
- } catch (Exception e) {
- softAssert.fail("Couldn't do partition equality.", e);
- }
- return softAssert;
- }
-
- /**
- * Assertion for equality of two tables (including table properties and table type).
- * @param cluster1 the ColoHelper of first cluster
- * @param table1 the first table
- * @param cluster2 the ColoHelper of second cluster
- * @param table2 the second table
- * @param softAssert object used for performing assertion
- * @return object used for performing assertion
- * @throws java.io.IOException
- */
- public static SoftAssert assertTableEqual(ColoHelper cluster1, HCatTable table1,
- ColoHelper cluster2, HCatTable table2,
- SoftAssert softAssert) throws IOException {
- return assertTableEqual(cluster1, table1, cluster2, table2, softAssert, true);
- }
-
- /**
- * Assertion for equality of two tables.
- * @param cluster1 the ColoHelper of first cluster
- * @param table1 the first table (expected values)
- * @param cluster2 the ColoHelper of second cluster
- * @param table2 the second table (actual values)
- * @param softAssert object used for performing assertion
- * @return object used for performing assertion
- * @throws java.io.IOException
- */
- public static SoftAssert assertTableEqual(ColoHelper cluster1, HCatTable table1,
- ColoHelper cluster2, HCatTable table2,
- SoftAssert softAssert,
- boolean notIgnoreTblTypeAndProps) throws IOException {
- FileSystem cluster1FS = cluster1.getClusterHelper().getHadoopFS();
- FileSystem cluster2FS = cluster2.getClusterHelper().getHadoopFS();
- final String table1FullName = table1.getDbName() + "." + table1.getTableName();
- final String table2FullName = table2.getDbName() + "." + table2.getTableName();
- LOGGER.info("Checking equality of table : " + table1FullName + " & " + table2FullName);
- //table metadata equality
- softAssert.assertEquals(table2.comment(), table1.comment(),
- "Table " + table1FullName + " has different comment from " + table2FullName);
- softAssert.assertEquals(table2.getBucketCols(), table1.getBucketCols(),
- "Table " + table1FullName + " has different bucket columns from " + table2FullName);
- assertColumnListEqual(table1.getCols(), table2.getCols(), softAssert);
- softAssert.assertEquals(table2.getNumBuckets(), table1.getNumBuckets(),
- "Table " + table1FullName + " has different number of buckets from " + table2FullName);
- assertColumnListEqual(table1.getPartCols(), table2.getPartCols(), softAssert);
- softAssert.assertEquals(table2.getSerdeParams(), table1.getSerdeParams(),
- "Table " + table1FullName + " has different serde params from " + table2FullName);
- softAssert.assertEquals(table2.getSortCols(), table1.getSortCols(),
- "Table " + table1FullName + " has different sort columns from " + table2FullName);
- softAssert.assertEquals(table2.getStorageHandler(), table1.getStorageHandler(),
- "Table " + table1FullName + " has different storage handler from " + table2FullName);
- if (notIgnoreTblTypeAndProps) {
- softAssert.assertEquals(table2.getTabletype(), table1.getTabletype(),
- "Table " + table1FullName + " has different Tabletype from " + table2FullName);
- }
- final Map<String, String> tbl1Props = table1.getTblProps();
- final Map<String, String> tbl2Props = table2.getTblProps();
- final String[] ignoreTblProps = {"transient_lastDdlTime", "repl.last.id",
- "last_modified_by", "last_modified_time", "COLUMN_STATS_ACCURATE", };
- for (String ignoreTblProp : ignoreTblProps) {
- tbl1Props.remove(ignoreTblProp);
- tbl2Props.remove(ignoreTblProp);
- }
- final String[] ignoreDefaultProps = {"numRows", "rawDataSize"};
- for (String ignoreProp : ignoreDefaultProps) {
- if ("-1".equals(tbl1Props.get(ignoreProp))) {
- tbl1Props.remove(ignoreProp);
- }
- if ("-1".equals(tbl2Props.get(ignoreProp))) {
- tbl2Props.remove(ignoreProp);
- }
- }
-
- if (notIgnoreTblTypeAndProps) {
- softAssert.assertEquals(tbl2Props, tbl1Props,
- "Table " + table1FullName + " has different TblProps from " + table2FullName);
- }
- LOGGER.info("Checking equality of table partitions");
- HCatClient hcatClient1 = cluster1.getClusterHelper().getHCatClient();
- HCatClient hcatClient2 = cluster2.getClusterHelper().getHCatClient();
- final List<HCatPartition> table1Partitions =
- hcatClient1.getPartitions(table1.getDbName(), table1.getTableName());
- final List<HCatPartition> table2Partitions =
- hcatClient2.getPartitions(table2.getDbName(), table2.getTableName());
- assertPartitionListEqual(table1Partitions, table2Partitions, softAssert);
- if (notIgnoreTblTypeAndProps) {
- softAssert.assertEquals(
- cluster2FS.getContentSummary(new Path(table2.getLocation())).getLength(),
- cluster1FS.getContentSummary(new Path(table1.getLocation())).getLength(),
- "Size of content for table1 and table2 are different");
- }
-
- //table content equality
- LOGGER.info("Checking equality of table contents");
- Statement jdbcStmt1 = null, jdbcStmt2 = null;
- try {
- final boolean execute1;
- final boolean execute2;
- jdbcStmt1 = cluster1.getClusterHelper().getHiveJdbcConnection().createStatement();
- jdbcStmt2 = cluster2.getClusterHelper().getHiveJdbcConnection().createStatement();
- execute1 = jdbcStmt1.execute("select * from " + table1FullName);
- execute2 = jdbcStmt2.execute("select * from " + table2FullName);
- softAssert.assertEquals(execute2, execute1,
- "Table " + table1FullName + " has different result of select * from " + table2FullName);
- if (execute1 && execute2) {
- final ResultSet resultSet1 = jdbcStmt1.getResultSet();
- final ResultSet resultSet2 = jdbcStmt2.getResultSet();
- final List<String> rows1 = HiveUtil.fetchRows(resultSet1);
- final List<String> rows2 = HiveUtil.fetchRows(resultSet2);
- softAssert.assertEquals(rows2, rows1,
- "Table " + table1FullName + " has different content from " + table2FullName);
- }
- } catch (SQLException e) {
- softAssert.fail("Comparison of content of table " + table1FullName
- + " with content of table " + table2FullName + " failed because of exception\n"
- + ExceptionUtils.getFullStackTrace(e));
- } finally {
- if (jdbcStmt1 != null) {
- try {
- jdbcStmt1.close();
- } catch (SQLException e) {
- LOGGER.warn("Closing of jdbcStmt1 failed: " + ExceptionUtils.getFullStackTrace(e));
- }
- }
- if (jdbcStmt2 != null) {
- try {
- jdbcStmt2.close();
- } catch (SQLException e) {
- LOGGER.warn("Closing of jdbcStmt2 failed: " + ExceptionUtils.getFullStackTrace(e));
- }
- }
- }
- return softAssert;
- }
-
- /**
- * Assertion for equality of two dbs.
- * @param cluster1 the ColoHelper of first cluster
- * @param db1 first database for comparison (expected values)
- * @param cluster2 the ColoHelper of second cluster
- * @param db2 second database for comparison (actual values)
- * @param softAssert object used for performing assertion
- * @return object used for performing assertion
- * @throws java.io.IOException
- */
- public static SoftAssert assertDbEqual(ColoHelper cluster1, HCatDatabase db1,
- ColoHelper cluster2, HCatDatabase db2,
- SoftAssert softAssert) throws IOException {
- HCatClient hcatClient1 = cluster1.getClusterHelper().getHCatClient();
- HCatClient hcatClient2 = cluster2.getClusterHelper().getHCatClient();
- //check database name equality
- final String db1Name = db1.getName();
- final String db2Name = db2.getName();
- softAssert.assertEquals(db2.getComment(), db1.getComment(), "Comment differ for the dbs");
- //check database properties equality
- softAssert.assertEquals(db2.getProperties(), db1.getProperties(),
- "Database " + db1Name + " has different properties from " + db2Name);
- //checking table equality
- final List<String> db1tableNames = hcatClient1.listTableNamesByPattern(db1Name, ".*");
- final List<String> db2tableNames = hcatClient2.listTableNamesByPattern(db2Name, ".*");
- Collections.sort(db1tableNames);
- Collections.sort(db2tableNames);
- softAssert.assertEquals(db2tableNames, db1tableNames,
- "Table names are not same. Actual: " + db1tableNames + " Expected: " + db2tableNames);
- for (String tableName : db1tableNames) {
- try {
- assertTableEqual(cluster1, hcatClient1.getTable(db1Name, tableName),
- cluster2, hcatClient2.getTable(db2Name, tableName), softAssert);
- } catch (HCatException e) {
- softAssert.fail("Table equality check threw exception.", e);
- }
- }
- return softAssert;
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HiveUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HiveUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HiveUtil.java
deleted file mode 100644
index 293a210..0000000
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HiveUtil.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.regression.core.util;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Utility class for stuff related to hive. All the methods in this class assume that they are
- * dealing with small dataset.
- */
-public final class HiveUtil {
-
- private HiveUtil() {
- throw new AssertionError("Instantiating utility class...");
- }
- private static final String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
-
- private static final Logger LOGGER = Logger.getLogger(HiveUtil.class);
-
- public static Connection getHiveJdbcConnection(final String jdbcUrl, final String user, final String password,
- final String hivePrincipal)
- throws ClassNotFoundException, SQLException, IOException, InterruptedException {
- final String transportMode = new HiveConf().get("hive.server2.transport.mode", "binary");
- String connectionStringSuffix = "";
- if (transportMode.equalsIgnoreCase("http")) {
- connectionStringSuffix += "transportMode=http;httpPath=cliservice;";
- }
- if (MerlinConstants.IS_SECURE) {
- connectionStringSuffix += String.format("principal=%s;kerberosAuthType=fromSubject;", hivePrincipal);
- }
- final String connectionStringSuffix2 = connectionStringSuffix;
- final UserGroupInformation ugi = KerberosHelper.getUGI(user);
- final Connection conn = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
- @Override
- public Connection run() throws Exception {
- Class.forName(DRIVER_NAME);
- return DriverManager.getConnection(jdbcUrl + "/;" + connectionStringSuffix2, ugi.getShortUserName(),
- password);
- }
- });
-
- return conn;
- }
-
- /**
- * Fetch rows from a given ResultSet and convert is a a list of string, each string is comma
- * separated column values. The output also has header with column names and footer with
- * number of rows returned.
- * @param rs result set
- * @return List of string - each string corresponds to the output output that you will get on
- * sql prompt
- * @throws SQLException
- */
- public static List<String> fetchRows(ResultSet rs) throws SQLException {
- ResultSetMetaData metaData = rs.getMetaData();
- List<String> output = new ArrayList<String>();
-
- int numberOfColumns = metaData.getColumnCount();
- StringBuilder sbCol = new StringBuilder();
- for (int i = 1; i <= numberOfColumns; i++) {
- if (i > 1) {
- sbCol.append(",");
- }
- String columnName = metaData.getColumnName(i);
- // the column name looks like tab1.col1
- // we want to remove table name else table equality will fail
- if (columnName.contains(".")) {
- columnName = columnName.split("\\.")[1];
- }
- sbCol.append("'").append(columnName).append("'");
- }
- LOGGER.info(sbCol.toString());
- output.add(sbCol.toString());
-
- int numberOfRows = 0;
- while (rs.next()) {
- StringBuilder sbVal = new StringBuilder();
- numberOfRows++;
- for (int i = 1; i <= numberOfColumns; i++) {
- if (i > 1) {
- sbVal.append(",");
- }
- String columnValue = rs.getString(i);
- sbVal.append("'").append(columnValue != null ? columnValue : "").append("'");
- }
- LOGGER.info(sbVal.toString());
- output.add(sbVal.toString());
- }
- Collections.sort(output); //sorting to ensure stability results across different runs
- String rowStr = (numberOfRows > 0 ? numberOfRows : "No")
- + (numberOfRows == 1 ? " row" : " rows") + " selected";
- LOGGER.info(rowStr);
- output.add(rowStr);
- return output;
- }
-
- /**
- * Run a sql using given connection.
- * @param connection The connection to be used for running sql
- * @param sql the sql to be run
- * @throws SQLException
- * @return output of the query as a List of strings
- */
- public static List<String> runSql(Connection connection, String sql) throws SQLException {
- Statement stmt = null;
- try {
- stmt = connection.createStatement();
- LOGGER.info("Executing: " + sql);
- stmt.execute(sql);
- final ResultSet resultSet = stmt.getResultSet();
- if (resultSet != null) {
- final List<String> output = fetchRows(resultSet);
- LOGGER.info("Results are:\n" + StringUtils.join(output, "\n"));
- return output;
- }
- LOGGER.info("Query executed.");
- } finally {
- if (stmt != null) {
- stmt.close();
- }
- }
- return new ArrayList<>();
- }
-}