You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/12 20:52:43 UTC
[3/6] beam git commit: Move HIFIO k8s scripts into shared dir
http://git-wip-us.apache.org/repos/asf/beam/blob/34b95076/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/es_test_data.py
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/es_test_data.py b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/es_test_data.py
deleted file mode 100644
index 1658e2c..0000000
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/es_test_data.py
+++ /dev/null
@@ -1,299 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Script to populate data on Elasticsearch
-# Hashcode for 1000 records is ed36c09b5e24a95fd8d3cc711a043a85320bb47d,
-# For test with query to select one record from 1000 docs,
-# hashcode is 83c108ff81e87b6f3807c638e6bb9a9e3d430dc7
-# Hashcode for 50m records (~20 gigs) is aff7390ee25c4c330f0a58dfbfe335421b11e405
-#!/usr/bin/python
-
-import json
-import time
-import logging
-import random
-import string
-import uuid
-import datetime
-
-import tornado.gen
-import tornado.httpclient
-import tornado.ioloop
-import tornado.options
-
-async_http_client = tornado.httpclient.AsyncHTTPClient()
-id_counter = 0
-upload_data_count = 0
-_dict_data = None
-
-
-
-def delete_index(idx_name):
- try:
- url = "%s/%s?refresh=true" % (tornado.options.options.es_url, idx_name)
- request = tornado.httpclient.HTTPRequest(url, method="DELETE", request_timeout=240,
- auth_username=tornado.options.options.username,
- auth_password=tornado.options.options.password)
- response = tornado.httpclient.HTTPClient().fetch(request)
- logging.info('Deleting index "%s" done %s' % (idx_name, response.body))
- except tornado.httpclient.HTTPError:
- pass
-
-
-def create_index(idx_name):
- schema = {
- "settings": {
- "number_of_shards": tornado.options.options.num_of_shards,
- "number_of_replicas": tornado.options.options.num_of_replicas
- },
- "refresh": True
- }
-
- body = json.dumps(schema)
- url = "%s/%s" % (tornado.options.options.es_url, idx_name)
- try:
- logging.info('Trying to create index %s' % (url))
- request = tornado.httpclient.HTTPRequest(url, method="PUT", body=body, request_timeout=240,
- auth_username=tornado.options.options.username,
- auth_password=tornado.options.options.password)
- response = tornado.httpclient.HTTPClient().fetch(request)
- logging.info('Creating index "%s" done %s' % (idx_name, response.body))
- except tornado.httpclient.HTTPError:
- logging.info('Looks like the index exists already')
- pass
-
-
-@tornado.gen.coroutine
-def upload_batch(upload_data_txt):
- try:
- request = tornado.httpclient.HTTPRequest(tornado.options.options.es_url + "/_bulk",
- method="POST", body=upload_data_txt,
- request_timeout=
- tornado.options.options.http_upload_timeout,
- auth_username=tornado.options.options.username,
- auth_password=tornado.options.options.password)
- response = yield async_http_client.fetch(request)
- except Exception as ex:
- logging.error("upload failed, error: %s" % ex)
- return
-
- result = json.loads(response.body.decode('utf-8'))
- res_txt = "OK" if not result['errors'] else "FAILED"
- took = int(result['took'])
- logging.info("Upload: %s - upload took: %5dms, total docs uploaded: %7d" % (res_txt, took,
- upload_data_count))
-
-
-def get_data_for_format(format,count):
- split_f = format.split(":")
- if not split_f:
- return None, None
-
- field_name = split_f[0]
- field_type = split_f[1]
-
- return_val = ''
-
- if field_type == "bool":
- if count%2 == 0:
- return_val = True
- else:
- return_val = False
-
- elif field_type == "str":
- return_val = field_name + str(count)
-
- elif field_type == "int":
- return_val = count
-
- elif field_type == "ipv4":
- return_val = "{0}.{1}.{2}.{3}".format(1,2,3,count%255)
-
- elif field_type in ["ts", "tstxt"]:
- return_val = int(count * 1000) if field_type == "ts" else\
- datetime.datetime.fromtimestamp(count)\
- .strftime("%Y-%m-%dT%H:%M:%S.000-0000")
-
- elif field_type == "words":
- return_val = field_name + str(count)
-
- elif field_type == "dict":
- mydict = dict(a=field_name + str(count), b=field_name + str(count), c=field_name + str(count),
- d=field_name + str(count), e=field_name + str(count), f=field_name + str(count),
- g=field_name + str(count), h=field_name + str(count), i=field_name + str(count),
- j=field_name + str(count))
- return_val = ", ".join("=".join(_) for _ in mydict.items())
-
- elif field_type == "text":
- return_val = field_name + str(count)
-
- return field_name, return_val
-
-
-def generate_count(min, max):
- if min == max:
- return max
- elif min > max:
- return random.randrange(max, min);
- else:
- return random.randrange(min, max);
-
-
-def generate_random_doc(format,count):
- global id_counter
-
- res = {}
-
- for f in format:
- f_key, f_val = get_data_for_format(f,count)
- if f_key:
- res[f_key] = f_val
-
- if not tornado.options.options.id_type:
- return res
-
- if tornado.options.options.id_type == 'int':
- res['_id'] = id_counter
- id_counter += 1
- elif tornado.options.options.id_type == 'uuid4':
- res['_id'] = str(uuid.uuid4())
-
- return res
-
-
-def set_index_refresh(val):
-
- params = {"index": {"refresh_interval": val}}
- body = json.dumps(params)
- url = "%s/%s/_settings" % (tornado.options.options.es_url, tornado.options.options.index_name)
- try:
- request = tornado.httpclient.HTTPRequest(url, method="PUT", body=body, request_timeout=240,
- auth_username=tornado.options.options.username,
- auth_password=tornado.options.options.password)
- http_client = tornado.httpclient.HTTPClient()
- http_client.fetch(request)
- logging.info('Set index refresh to %s' % val)
- except Exception as ex:
- logging.exception(ex)
-
-
-@tornado.gen.coroutine
-def generate_test_data():
-
- global upload_data_count
-
- if tornado.options.options.force_init_index:
- delete_index(tornado.options.options.index_name)
-
- create_index(tornado.options.options.index_name)
-
- # todo: query what refresh is set to, then restore later
- if tornado.options.options.set_refresh:
- set_index_refresh("-1")
-
- if tornado.options.options.out_file:
- out_file = open(tornado.options.options.out_file, "w")
- else:
- out_file = None
-
- if tornado.options.options.dict_file:
- global _dict_data
- with open(tornado.options.options.dict_file, 'r') as f:
- _dict_data = f.readlines()
- logging.info("Loaded %d words from the %s" % (len(_dict_data),
- tornado.options.options.dict_file))
-
- format = tornado.options.options.format.split(',')
- if not format:
- logging.error('invalid format')
- exit(1)
-
- ts_start = int(time.time())
- upload_data_txt = ""
- total_uploaded = 0
-
- logging.info("Generating %d docs, upload batch size is %d" % (tornado.options.options.count,
- tornado.options
- .options.batch_size))
- for num in range(0, tornado.options.options.count):
-
- item = generate_random_doc(format,num)
-
- if out_file:
- out_file.write("%s\n" % json.dumps(item))
-
- cmd = {'index': {'_index': tornado.options.options.index_name,
- '_type': tornado.options.options.index_type}}
- if '_id' in item:
- cmd['index']['_id'] = item['_id']
-
- upload_data_txt += json.dumps(cmd) + "\n"
- upload_data_txt += json.dumps(item) + "\n"
- upload_data_count += 1
-
- if upload_data_count % tornado.options.options.batch_size == 0:
- yield upload_batch(upload_data_txt)
- upload_data_txt = ""
-
- # upload remaining items in `upload_data_txt`
- if upload_data_txt:
- yield upload_batch(upload_data_txt)
-
- if tornado.options.options.set_refresh:
- set_index_refresh("1s")
-
- if out_file:
- out_file.close()
-
- took_secs = int(time.time() - ts_start)
-
- logging.info("Done - total docs uploaded: %d, took %d seconds" %
- (tornado.options.options.count, took_secs))
-
-
-if __name__ == '__main__':
- tornado.options.define("es_url", type=str, default='http://localhost:9200/',
- help="URL of your Elasticsearch node")
- tornado.options.define("index_name", type=str, default='test_data',
- help="Name of the index to store your messages")
- tornado.options.define("index_type", type=str, default='test_type', help="Type")
- tornado.options.define("batch_size", type=int, default=1000,
- help="Elasticsearch bulk index batch size")
- tornado.options.define("num_of_shards", type=int, default=2,
- help="Number of shards for ES index")
- tornado.options.define("http_upload_timeout", type=int, default=3,
- help="Timeout in seconds when uploading data")
- tornado.options.define("count", type=int, default=100000, help="Number of docs to generate")
- tornado.options.define("format", type=str, default='name:str,age:int,last_updated:ts',
- help="message format")
- tornado.options.define("num_of_replicas", type=int, default=0,
- help="Number of replicas for ES index")
- tornado.options.define("force_init_index", type=bool, default=False,
- help="Force deleting and re-initializing the Elasticsearch index")
- tornado.options.define("set_refresh", type=bool, default=False,
- help="Set refresh rate to -1 before starting the upload")
- tornado.options.define("out_file", type=str, default=False,
- help="If set, write test data to out_file as well.")
- tornado.options.define("id_type", type=str, default=None,
- help="Type of 'id' to use for the docs, \
- valid settings are int and uuid4, None is default")
- tornado.options.define("dict_file", type=str, default=None,
- help="Name of dictionary file to use")
- tornado.options.define("username", type=str, default=None, help="Username for elasticsearch")
- tornado.options.define("password", type=str, default=None, help="Password for elasticsearch")
- tornado.options.parse_command_line()
-
- tornado.ioloop.IOLoop.instance().run_sync(generate_test_data)
http://git-wip-us.apache.org/repos/asf/beam/blob/34b95076/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/show-health.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/show-health.sh b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/show-health.sh
deleted file mode 100644
index 8fa912c..0000000
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/show-health.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-#!/bin/sh
-set -e
-
-external_ip="$(kubectl get svc elasticsearch -o jsonpath='{.status.loadBalancer.ingress[0].ip}')"
-
-echo "Elasticsearch cluster health info"
-echo "---------------------------------"
-curl $external_ip:9200/_cluster/health
-echo # empty line since curl doesn't output CRLF
\ No newline at end of file